Spring Boot WebFlux : Using RSocket for communication between Microservices

Yasas Ranawaka
5 min readMar 12, 2023

--

The most common way of communicating between microservices is using HTTP. But in most real-time cases when we have scenarios beyond simple request-response, it is not easy to use HTTP. So let’s see how we can use RSocket for server-server communication with Spring WebFlux.

What / Why ?

As it is mentioned on the RSocket’s website (https://rsocket.io), it is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron.

This included valuable key features like back-pressure, resumption, fragmentation, multiplexing, and routing. And also, there are 4 types of messaging/communication models, such as,

  • Request-Response
  • Request-Stream : send one message and receive a stream of messages
  • Fire-and-Forget : send one message and no response
  • Channel / Bi-Directional : send stream of messages in both directions

As RSocket is also fully reactive, it is very easy to use this with Spring WebFlux microservices, and it is also very useful for large distributed systems.

Now, let’s implement some basic communication scenarios between two Spring Boot microservices. (To understand the implementation, you should have some simple knowledge about the project reactor, Mono, and Flux.)

First, you have to initialize two spring boot projects.

For the demonstration, I am going to initialize two services called “User-Service” and “Unit-Service”. As it means from the naming, “user-service” is the one that handles all the scenarios related to users, while “unit-service” is the one that handles all the “Unit” ( it is a DB entity) details. And let’s assume that unit services depend on user services.

For both projects, we need two main dependencies, which are Spring Reactive Web and RSocket.

   <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

Let’s consider the User-Service.

In the application properties file, let’s add the server port and RSocket port number (7000). So, RSocket is running at port 7000.

spring:
application:
name: user-service
rsocket:
server:
port: 7000
transport: tcp
main:
lazy-initialization: true

server:
port: 8001

Now Let’s add RSocket endpoints into the unit-service. In this case, a controller class should be created which is annotated as Spring @Controller.

@Slf4j
@AllArgsConstructor
@Controller()
public class UserRSocketController {

private final UserService userService;

// request-response
@MessageMapping("get-user-name-by-id")
public Mono<String> getUserNameById(String id) {
log.info("UserRSocketController : Received to route : get-user-name-by-id");
return userService.getUserNameById(id);
}

// request-stream
@MessageMapping("get-all-users-names")
public Flux<String> getUserAllNames() {
log.info("UserRSocketController : Received to route : get-all-users-names");
return userService.getAllUsesNames();
}

// fire-and-forget
@MessageMapping("collect-unit-name")
public Mono<Void> collectUnitName(String unitName) {
log.info("UserRSocketController : Received to route : get-unit-names , unit name {}", unitName);
return userService.collectUnitName(unitName);
}
}

Here, I have used a service implementation class that has all service layer implementations.

The most important parts to which we should pay attention are the methods that were annotated with the @MessageMapping annotation from “springframework.messaging.handler.annotation”. Let’s consider the first method named “getUserNameById” annotated with @MessageMapping(“get-user-name-by-id”). This means that any message with the RSocket route “get-user-name-by-id” should be handled by this method. So these are the RSocket endpoints that we can use for server-server communication.

I have put comments for request-response, request-stream, and fire-and-forget methods separately. See the return types of these methods.

Now let’s consider the Unit-Service

Here, we don’t need to mention the RSocket port number in the properties. It’s better to have the server port mentioned.

spring:
application:
name: unit-service

server:
port: 8002

In order to connect to the RSocket server, we need to create an instance of RSocketRequest. To achieve that, we can create a configuration class that creates a Bean for RSocketRequest.

@Configuration
public class RSocketConfiguration {

@Bean
public RSocketStrategies rSocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}

@Bean
public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.builder()
.rsocketConnector(r -> r.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.rsocketStrategies(rSocketStrategies)
.dataMimeType(MediaType.APPLICATION_JSON)
.metadataMimeType(MimeType.valueOf("message/x.rsocket.routing.v0"))
.tcp("localhost", 7000);
}
}

RSocketRequest.Builder provides some methods to customize the instance. Here, it is mentioned that the tcp host and the port number of the RSocket server Also, we can set the mime type for data and metadata on the connection.

Then, for demonstration purposes, I will create a RestController. Then we can understand how it is working in different message modes.

@AllArgsConstructor
@RestController
@RequestMapping("/units")
public class UnitController {

private final UnitService unitService;

// to test request-response
@GetMapping(value = "/{unitId}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<String> getUserNameForUnitId(@PathVariable String unitId) {
return unitService.getUserNameForUnitId(unitId);
}

// to test request-stream
@GetMapping(value = "/stream/users", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getUserNameForUnitId() {
return unitService.getAllUsersNames();
}

// to test fire-and-forget
@PostMapping(value = "/name/{unitName}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<String> collectUnitName(@PathVariable String unitName) {
return unitService.sendUnitNameToUserServiced(unitName)
.thenReturn("Received");
}
}

Here also, I have used a service implementation class called “unit-service,” which has all service layer implementations. Then let’s do the implementation of the server layer, where the service is communicating with “user-service”.

@Slf4j
@AllArgsConstructor
@Service
public class UnitServiceImpl implements UnitService {

private final RSocketRequester rSocketRequester;

// messaging: request-response
@Override
public Mono<String> getUserNameForUnitId(String unitId) {
return rSocketRequester.route("get-user-name-by-id")
.data("1")
.retrieveMono(String.class)
.doOnNext(data -> log.info("request- response data from rout : get-user-name-by-id , data {}", data))
.doOnError(throwable -> log.error("Error in reading data from rout : get-user-name-by-id", throwable));
}

// messaging: request-stream
@Override
public Flux<String> getAllUsersNames() {
return rSocketRequester.route("get-all-users-names")
.retrieveFlux(String.class)
.doOnNext(data -> log.info("request-stream data from rout : get-all-users-names , data {}", data))
.doOnError(throwable -> log.error("Error in reading data from rout : get-user-name-by-id", throwable));
}

// messaging: fire-and-forget
@Override
public Mono<Void> sendUnitNameToUserServiced(String unitName) {
return rSocketRequester.route("collect-unit-name")
.data(unitName)
.send()
.doOnNext(data -> log.info("fire-and-forgot from rout : get-all-users-names , data {}", data))
.doOnError(throwable -> log.error("Error in sending data from rout : get-user-name-by-id", throwable));
}
}

Here you can see how the RSocketRequest is used to communicate with the RSocket endpoint that we created in user-service with matching route names. As it is used above, the “retrieveMono()” method was used for the request-response messaging, the “retrieveFlux()” method was used for the request-stream, and the “send()” method was used for the fire-and-forget messaging.

You can see the source code from https://github.com/yasas1/Questions/tree/main/rsocket-demo

To test the implementation, you can call each REST endpoint declared in the unit-service. You can debug the implementation and also see the logs.

Request-response

curl — request GET ‘http://localhost:8002/units/1'

Request-stream

curl -N — request GET ‘http://localhost:8002/units/stream/users'

Fire-and-forget

curl — request POST ‘http://localhost:8002/units/name/Global'

--

--

No responses yet