https://medium.com/netifi/spring-webflux-and-rxjava2-jdbc-83a94e71ba04
Spring WebFlux is a great way to create a non-blocking REST application. One issue that you run into when start working with WebFlux is JDBC. JDBC is blocking. New school databases like Cassandra or Couchbase have non-blocking drivers. In Couchbase’s case its driver uses RXJava. NoSQL databases are great, but you can’t always use one for various reasons. There is some effort going into creating asynchronous drivers for databases, as well as Oracle’s effort to create ADBA. Unfortunately, these are early days, and if you want to talk to a SQL database on the JVM you’re stuck with a blocking driver. This is a problem when you’re building a non-blocking application with Spring WebFlux . Luckily reactor-core and rxjava2 both implement ReactiveStreams.
This means that you can use David Moten’s rxjava2-jdbc library. It takes a normal JDBC driver and allows you to interact with it in a way that won’t block your application. It does this by scheduling potentially blocking work on different threads. Additionally, it has DSL that lets you model your SQL statements and results into streams. This makes integrating into your reactive application a snap.
Rxjava2-jdbc also provides non-blocking connection pooling. Normally in a blocking application threads are blocked until a connection is available to make a query. This is a problem for a non-blocking application as you will quickly exhaust all database connection threads and turn your wonderful non-blocking application into a clunky blocking application. In rxjava2-jdbc its connection pool returns a Single that is controlled by the connection pool. This means that instead of assigning threads to each query, queries subscribe to the connection pool and receive a connection when one is available — no blocked threads!
Nothing is better than an example, so I created one that uses Spring WebFlux, rxjava2-jdbc, and H2. The example is a simple REST application that uses an embedded H2 database. It can be found here. The example has two objects. The first is an Employee object which contains information about the employee including: first name, last name, and department id. The second object is the Department object, which contains information about different departments. The application has restful endpoints for basic create, read, delete and list operations for the Employee and list operations for Department.
Running the Example
Running the example is simple and can be done with following command:
./gradlew clean run
This command will configure the database and start the WebFlux application using Spring Boot. The example doesn’t need a servlet container to run, and instead uses Netty to serve traffic. If you want to test and see if the example is up and running you can run the following curl command:
curl localhost:8080/employees
You should see a JSON employee list if it’s running properly.
Routing Requests
Let’s take look inside the application and see what it’s doing. The first thing to take a look at is the EmployeeRouter class. This class routes incoming HTTP requests to the appropriate handler based on URL and HTTP method.
EmployeeRouter.java hosted with ❤ by GitHub
@Configuration
public class EmployeeRouter {
@Bean
public RouterFunction<ServerResponse> route(EmployeeHandler handler) {
return RouterFunctions.route(
GET("/employees").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllEmployees)
.andRoute(
GET("/employee/fn/{fn}/ln/{ln}")
.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getEmployee)
.andRoute(
PUT("/employee").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::createNewEmployee)
.andRoute(
DELETE("/employee/id/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::deleteEmployee)
.andRoute(
GET("/departments").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
handler::getAllDepartments);
}
}
In Spring WebFlux because it’s non-blocking these handlers must return a Mono
Handling Requests
The next class to take a look at is the class that is handling the requests, EmployeeHandler.
EmployeeHandler.java hosted with ❤ by GitHub
@Component
public class EmployeeHandler {
private final EmployeeRepository repository;
public EmployeeHandler(EmployeeRepository repository) {
this.repository = repository;
}
public Mono<ServerResponse> getAllEmployees(ServerRequest request) {
Flux<Employee> employees = repository.getAllEmployees();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employees, Employee.class);
}
public Mono<ServerResponse> getEmployee(ServerRequest request) {
String firstName = request.pathVariable("fn");
String lastName = request.pathVariable("ln");
Mono<Employee> employee = repository.getEmployee(firstName, lastName);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employee, Employee.class);
}
public Mono<ServerResponse> createNewEmployee(ServerRequest request) {
Mono<Employee> employeeMono = request.bodyToMono(Employee.class);
Mono<Employee> employee = repository.createNewEmployee(employeeMono);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(employee, Employee.class);
}
public Mono<ServerResponse> deleteEmployee(ServerRequest request) {
String id = request.pathVariable("id");
Mono<Void> employee = repository.deleteEmployee(id);
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).build(employee);
}
public Mono<ServerResponse> getAllDepartments(ServerRequest request) {
Flux<Department> allDepartments = repository.getAllDepartments();
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(allDepartments, Department.class);
}
}
This is the class that is responsible for handling incoming requests. Each handle method returns a Mono
Interacting with the Database
To create our responses we need to get the information from our database. The object that takes care of that is the the EmployeeRepository which is being injected into the handler. This is normally where things start to fall apart because you’re accessing a blocking database. In our case though we’re using rxjava2-jdbc and it’s no longer an issue. While this does turn a JDBC driver into a non-blocking driver and deals with the difficulties of scheduling potentially blocking work off the event loop, this isn’t as efficient as a purely non-blocking approach. However, it is sufficient for all but the highest throughput applications. Lets take a look at the EmployeeRepository class:
EmployeeRepository.java hosted with ❤ by GitHub
@Component
public class EmployeeRepository {
private Database db;
public EmployeeRepository() throws Exception {
Connection connection = DriverManager.getConnection("jdbc:h2:./build/mydatabase", "sa", "sa");
NonBlockingConnectionPool pool =
Pools.nonBlocking()
.maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
.connectionProvider(ConnectionProvider.from(connection))
.build();
this.db = Database.from(pool);
}
Flux<Employee> getAllEmployees() {
String sql = "SELECT * FROM employee e JOIN department d ON e.department_id = d.department_id";
Flowable<Employee> employeeFlowable =
db.select(sql)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt("employee_id"));
employee.setFirstName(rs.getString("employee_firstname"));
employee.setLastName(rs.getString("employee_lastname"));
employee.setDepartment(rs.getString("department_name"));
return employee;
});
return Flux.from(employeeFlowable);
}
Mono<Employee> getEmployee(String firstName, String lastName) {
String sql =
"SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
+ "JOIN department d ON e.department_id = d.department_id "
+ "WHERE employee_firstname = ? AND "
+ "employee_lastname = ?";
Flowable<Employee> employeeFlowable =
db.select(sql)
.parameters(firstName, lastName)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt("employee_id"));
employee.setFirstName(rs.getString("employee_firstname"));
employee.setLastName(rs.getString("employee_lastname"));
employee.setDepartment(rs.getString("department_name"));
return employee;
});
return Mono.from(employeeFlowable);
}
Mono<Employee> createNewEmployee(Mono<Employee> employeeMono) {
String createSql =
"INSERT INTO employee (employee_firstname, employee_lastname, department_id) VALUES (?, ?, ?)";
String selectDepartmentId = "SELECT department_id from department where department_name = ?";
String selectSql =
"SELECT employee_id, employee_firstname, employee_lastname, department_name FROM employee e "
+ "JOIN department d ON e.department_id = d.department_id "
+ "WHERE employee_id = ?";
return employeeMono.flatMap(
newEmployee -> {
Flowable<Integer> employeeIds =
db.select(selectDepartmentId)
.parameters(newEmployee.getDepartment())
.getAs(Integer.class)
.flatMap(
departmentId ->
db.update(createSql)
.parameters(
newEmployee.getFirstName(),
newEmployee.getLastName(),
departmentId)
.returnGeneratedKeys()
.getAs(Integer.class));
Flowable<Employee> employeeFlowable =
db.select(selectSql)
.parameterStream(employeeIds)
.get(
rs -> {
Employee employee = new Employee();
employee.setId(rs.getInt("employee_id"));
employee.setFirstName(rs.getString("employee_firstname"));
employee.setLastName(rs.getString("employee_lastname"));
employee.setDepartment(rs.getString("department_name"));
return employee;
});
return Mono.from(employeeFlowable);
});
}
Mono<Void> deleteEmployee(String id) {
String sql = "DELETE FROM employee WHERE employee_id = ?";
Flowable<Integer> counts = db.update(sql).parameter(id).counts();
return Flux.from(counts).then();
}
Flux<Department> getAllDepartments() {
return Flux.from(db.select(Department.class).get());
}
}
The first thing to look at here is the constructor. The main class in rxjava2-jdbc is the Database class. This is what you use to interact with the database and run queries. Before you can do that you need to create a connection to the database. For our example, we’re creating a connection to an H2 database. Many databases allow you to pass in user credentials and other information in there connection string. If they do then you can use just create a database like so:
Database db = Database.from(“<connection_string”>, <pool_size>);
However, H2 doesn’t allow you to do this, so you need to create a Connection object in order to pass in credentials. You then have to use the Pools builder to create a pool. Once you’ve created the your pool you can create the database object.
The great thing about rxjava2-jdbc is that any JDBC driver can used. You can switch any standard JDBC database like Oracle, SQL Server, DB2, etc.
I’m not going to go into too much detail about how rxjava2-jdbc works here because David Moten’s github page has great documentation.
I will point out a few things though about using Spring WebFlux and rxjava2-jdbc together. The good news is they both implement ReactiveStreams so it’s transparent for them to interoperate. The way to do this with reactor-core is to use the Flux.from or Mono.from methods. These can take any publisher. It is important to note that in the Mono version it can only emit one item or it will throw an exception. Another thing to take a look at is the deleteEmployee method. This method returns a Mono
Write some Code
To really get a feel for how this works, the next step is to write some code. One idea I would suggest are to port the sample app to the database of your choice. Another idea is to create endpoints for creating a new department and deleting one. You can use the createNewEmployee and deleteEmployee methods as references.
Conclusion
The reality is that many applications today use SQL databases, and reactive application developers did not have an easy way to access them without turning their non-blocking application into a blocking application. Hopefully in the future there will be great solutions around truly non-blocking JDBC drivers. Until that time, I hope you can see with this blog post, that while not perfect, there is an effective solution.