Spring WebFlux and rxjava2-jdbc

2018/7/7 posted in  Java

https://medium.com/netifi/spring-webflux-and-rxjava2-jdbc-83a94e71ba04

Spring WebFlux is a great way to create a non-blocking REST application. One issue that you run into when start working with WebFlux is JDBC. JDBC is blocking. New school databases like Cassandra or Couchbase have non-blocking drivers. In Couchbase’s case its driver uses RXJava. NoSQL databases are great, but you can’t always use one for various reasons. There is some effort going into creating asynchronous drivers for databases, as well as Oracle’s effort to create ADBA. Unfortunately, these are early days, and if you want to talk to a SQL database on the JVM you’re stuck with a blocking driver. This is a problem when you’re building a non-blocking application with Spring WebFlux . Luckily reactor-core and rxjava2 both implement ReactiveStreams.

This means that you can use David Moten’s rxjava2-jdbc library. It takes a normal JDBC driver and allows you to interact with it in a way that won’t block your application. It does this by scheduling potentially blocking work on different threads. Additionally, it has DSL that lets you model your SQL statements and results into streams. This makes integrating into your reactive application a snap.

Rxjava2-jdbc also provides non-blocking connection pooling. Normally in a blocking application threads are blocked until a connection is available to make a query. This is a problem for a non-blocking application as you will quickly exhaust all database connection threads and turn your wonderful non-blocking application into a clunky blocking application. In rxjava2-jdbc its connection pool returns a Single that is controlled by the connection pool. This means that instead of assigning threads to each query, queries subscribe to the connection pool and receive a connection when one is available — no blocked threads!

Nothing is better than an example, so I created one that uses Spring WebFlux, rxjava2-jdbc, and H2. The example is a simple REST application that uses an embedded H2 database. It can be found here. The example has two objects. The first is an Employee object which contains information about the employee including: first name, last name, and department id. The second object is the Department object, which contains information about different departments. The application has restful endpoints for basic create, read, delete and list operations for the Employee and list operations for Department.

Running the Example

Running the example is simple and can be done with following command:

./gradlew clean run

This command will configure the database and start the WebFlux application using Spring Boot. The example doesn’t need a servlet container to run, and instead uses Netty to serve traffic. If you want to test and see if the example is up and running you can run the following curl command:

curl localhost:8080/employees

You should see a JSON employee list if it’s running properly.

Routing Requests

Let’s take look inside the application and see what it’s doing. The first thing to take a look at is the EmployeeRouter class. This class routes incoming HTTP requests to the appropriate handler based on URL and HTTP method.

EmployeeRouter.java hosted with ❤ by GitHub

@Configuration
public class EmployeeRouter {
  @Bean
  public RouterFunction<ServerResponse> route(EmployeeHandler handler) {
    return RouterFunctions.route(
            GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
            handler::getAllEmployees)
        .andRoute(
            GET("/employee/fn/{fn}/ln/{ln}")
                .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
            handler::getEmployee)
        .andRoute(
            PUT("/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
            handler::createNewEmployee)
        .andRoute(
            DELETE("/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
            handler::deleteEmployee)
        .andRoute(
            GET("/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
            handler::getAllDepartments);
  }
}

In Spring WebFlux because it’s non-blocking these handlers must return a Mono.

Handling Requests

The next class to take a look at is the class that is handling the requests, EmployeeHandler.

EmployeeHandler.java hosted with ❤ by GitHub

@Component
public class EmployeeHandler {
  private final EmployeeRepository repository;

  public EmployeeHandler(EmployeeRepository repository) {
    this.repository = repository;
  }

  public Mono<ServerResponse> getAllEmployees(ServerRequest request) {
    Flux<Employee> employees = repository.getAllEmployees();
    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(employees, Employee.class);
  }

  public Mono<ServerResponse> getEmployee(ServerRequest request) {
    String firstName = request.pathVariable("fn");
    String lastName = request.pathVariable("ln");
    Mono<Employee> employee = repository.getEmployee(firstName, lastName);

    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(employee, Employee.class);
  }

  public Mono<ServerResponse> createNewEmployee(ServerRequest request) {
    Mono<Employee> employeeMono = request.bodyToMono(Employee.class);
    Mono<Employee> employee = repository.createNewEmployee(employeeMono);

    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(employee, Employee.class);
  }

  public Mono<ServerResponse> deleteEmployee(ServerRequest request) {
    String id = request.pathVariable("id");
    Mono<Void> employee = repository.deleteEmployee(id);

    return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).build(employee);
  }

  public Mono<ServerResponse> getAllDepartments(ServerRequest request) {
    Flux<Department> allDepartments = repository.getAllDepartments();
    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(allDepartments, Department.class);
  }
}

This is the class that is responsible for handling incoming requests. Each handle method returns a Mono. The ServerResponse object has a builder that is used to create the response, and has methods that take Fluxes and Monos for responses. The ServerResponse object will also automatically serialize your results to JSON. Additionally, if you return a stream of responses, WebFlux will automatically turn the response into a JSON list. You can can see this in the getAllEmployees and getAllDepoartment methods. A method to look closer at is the deleteEmployee method. The deleteEmployee method is a little different then the other methods. You’ll notice that most methods use the body method to create a response. The deleteEmployee doesn’t have a body so it uses the build method instead which takes a Mono.

Interacting with the Database

To create our responses we need to get the information from our database. The object that takes care of that is the the EmployeeRepository which is being injected into the handler. This is normally where things start to fall apart because you’re accessing a blocking database. In our case though we’re using rxjava2-jdbc and it’s no longer an issue. While this does turn a JDBC driver into a non-blocking driver and deals with the difficulties of scheduling potentially blocking work off the event loop, this isn’t as efficient as a purely non-blocking approach. However, it is sufficient for all but the highest throughput applications. Lets take a look at the EmployeeRepository class:

EmployeeRepository.java hosted with ❤ by GitHub

@Component
public class EmployeeRepository {
  private Database db;

  public EmployeeRepository() throws Exception {
    Connection connection = DriverManager.getConnection("jdbc:h2:./build/mydatabase", "sa", "sa");
    NonBlockingConnectionPool pool =
        Pools.nonBlocking()
            .maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
            .connectionProvider(ConnectionProvider.from(connection))
            .build();

    this.db = Database.from(pool);
  }

  Flux<Employee> getAllEmployees() {
    String sql = "SELECT * FROM employee e JOIN department d ON e.department_id = d.department_id";

    Flowable<Employee> employeeFlowable =
        db.select(sql)
            .get(
                rs -> {
                  Employee employee = new Employee();
                  employee.setId(rs.getInt("employee_id"));
                  employee.setFirstName(rs.getString("employee_firstname"));
                  employee.setLastName(rs.getString("employee_lastname"));
                  employee.setDepartment(rs.getString("department_name"));

                  return employee;
                });

    return Flux.from(employeeFlowable);
  }

  Mono<Employee> getEmployee(String firstName, String lastName) {
    String sql =
        "SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
            + "JOIN department d ON e.department_id = d.department_id "
            + "WHERE employee_firstname = ? AND "
            + "employee_lastname = ?";

    Flowable<Employee> employeeFlowable =
        db.select(sql)
            .parameters(firstName, lastName)
            .get(
                rs -> {
                  Employee employee = new Employee();
                  employee.setId(rs.getInt("employee_id"));
                  employee.setFirstName(rs.getString("employee_firstname"));
                  employee.setLastName(rs.getString("employee_lastname"));
                  employee.setDepartment(rs.getString("department_name"));

                  return employee;
                });

    return Mono.from(employeeFlowable);
  }

  Mono<Employee> createNewEmployee(Mono<Employee> employeeMono) {

    String createSql =
        "INSERT INTO employee (employee_firstname, employee_lastname, department_id) VALUES (?, ?, ?)";
    String selectDepartmentId = "SELECT department_id from department where department_name = ?";
    String selectSql =
        "SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
            + "JOIN department d ON e.department_id = d.department_id "
            + "WHERE employee_id = ?";

    return employeeMono.flatMap(
        newEmployee -> {
          Flowable<Integer> employeeIds =
              db.select(selectDepartmentId)
                  .parameters(newEmployee.getDepartment())
                  .getAs(Integer.class)
                  .flatMap(
                      departmentId ->
                          db.update(createSql)
                              .parameters(
                                  newEmployee.getFirstName(),
                                  newEmployee.getLastName(),
                                  departmentId)
                              .returnGeneratedKeys()
                              .getAs(Integer.class));

          Flowable<Employee> employeeFlowable =
              db.select(selectSql)
                  .parameterStream(employeeIds)
                  .get(
                      rs -> {
                        Employee employee = new Employee();
                        employee.setId(rs.getInt("employee_id"));
                        employee.setFirstName(rs.getString("employee_firstname"));
                        employee.setLastName(rs.getString("employee_lastname"));
                        employee.setDepartment(rs.getString("department_name"));

                        return employee;
                      });

          return Mono.from(employeeFlowable);
        });
  }

  Mono<Void> deleteEmployee(String id) {
    String sql = "DELETE FROM employee WHERE employee_id = ?";
    Flowable<Integer> counts = db.update(sql).parameter(id).counts();
    return Flux.from(counts).then();
  }

  Flux<Department> getAllDepartments() {
    return Flux.from(db.select(Department.class).get());
  }
}

The first thing to look at here is the constructor. The main class in rxjava2-jdbc is the Database class. This is what you use to interact with the database and run queries. Before you can do that you need to create a connection to the database. For our example, we’re creating a connection to an H2 database. Many databases allow you to pass in user credentials and other information in there connection string. If they do then you can use just create a database like so:

Database db = Database.from(“<connection_string”>, <pool_size>);

However, H2 doesn’t allow you to do this, so you need to create a Connection object in order to pass in credentials. You then have to use the Pools builder to create a pool. Once you’ve created the your pool you can create the database object.

The great thing about rxjava2-jdbc is that any JDBC driver can used. You can switch any standard JDBC database like Oracle, SQL Server, DB2, etc.

I’m not going to go into too much detail about how rxjava2-jdbc works here because David Moten’s github page has great documentation.

I will point out a few things though about using Spring WebFlux and rxjava2-jdbc together. The good news is they both implement ReactiveStreams so it’s transparent for them to interoperate. The way to do this with reactor-core is to use the Flux.from or Mono.from methods. These can take any publisher. It is important to note that in the Mono version it can only emit one item or it will throw an exception. Another thing to take a look at is the deleteEmployee method. This method returns a Mono which is a special convention in reactor-core. This roughly equivalent to a Completable in rxjava2. To mimic this behavior in reactor-core, call the then method on a Flux or a Mono. This will ignore all elements that are emitted from the stream, and either complete or emit an exception.

Write some Code

To really get a feel for how this works, the next step is to write some code. One idea I would suggest are to port the sample app to the database of your choice. Another idea is to create endpoints for creating a new department and deleting one. You can use the createNewEmployee and deleteEmployee methods as references.

Conclusion

The reality is that many applications today use SQL databases, and reactive application developers did not have an easy way to access them without turning their non-blocking application into a blocking application. Hopefully in the future there will be great solutions around truly non-blocking JDBC drivers. Until that time, I hope you can see with this blog post, that while not perfect, there is an effective solution.