Request-Stream API with Reactive Spring

Yasas Ranawaka
6 min readSep 5, 2022

--

In this article, I am going to explore Reactive Java with Spring WebFlux while keeping the main purpose of creating a stream API.

If you haven’t my previous articles, You can find in here

https://www.linkedin.com/in/yasasr/details/featured/

So, we know in project reactor reactive programming, the specification mainly consists of four interfaces that are called Publisher, Subscriber, Subscription, and Processor.

For this article, it is good to have some idea of the Processor interface. If you see this interface, it implements both the Publisher and Subscriber interfaces, which means that it is both a producer and a consumer. That’s it.

What does it mean by request-stream API ?

In the request-stream model, a single request will receive multiple responses. We can think that it is one-to-many, as elaborated in the diagram below.

Request-Stream

Let’s start by creating a new project.

For the purpose of demonstrating, I’m going to consider a scenario that has one entity called notification, which has id and message as fields. And MongoDB will be used as the database.

01) Initialize the Project

So, go to the spring initializr and select WebFlux, Reactive MongoDB, Lombok dependencies, and whatever you would like. Then generate the project.

Add dependencies

Then you will have dependencies in pom.xml like below.

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Then we can give the database connection properties. In my locale, I have created a database called “demo” and the port is 27017. Then the properties file will be like this.

spring:
application:
name: demo-service
data:
mongodb:
database: demo
host: localhost
port: 27017
server:
port: 9000

Since I am going to use reactive Mongo for the project, the “EnableReactiveMongoRepositories” annotation should be in the application main class.

@EnableReactiveMongoRepositories
@SpringBootApplication
public class StreamDemoApplication {
public static void main(String[] args) {
SpringApplication.run(StreamDemoApplication.class, args);
}
}

02) Entity and the Repository

For this example, an entity called notification will be created, which has an id and a message as fields.

@Setter
@Getter
@Builder
@Document(collection = "notification")
public class Notification {
@Id
private String id;
private String message;
}

And it is good to have a DTO class which is going to be used in the notification creating API.

@Getter
@Setter
public class NotificationDto {
private String message;
}

Then we can create a repository interface by extending the ReactiveMongoRepository interface.

@Repository
public interface NotificationRepository extends ReactiveMongoRepository<Notification,String> {
}

03) Create Util classes

In this example, I am going to create two helper classes.

For the first one, I will call it NotificationHelper, and it will be used to map objects and validate the fields.

public class NotificationHelper {

public static void validateNotificationRequest(NotificationDto notificationRequest) {
if (notificationRequest.getMessage() == null || notificationRequest.getMessage().isBlank()){
throw new ResponseStatusException(HttpStatus.BAD_REQUEST,"Notification message is mandatory");
}
}

public static Notification notificationDtoToNotificationEntity(NotificationDto notificationDto) {
return Notification.builder().message(notificationDto.getMessage()).build();
}
}

For the second, I will call it HeartBeat, and it will be used in the request stream API.

@Getter
public class HeartBeat {
public static final HeartBeat INSTANCE = new HeartBeat();
private final String type = "HeartBeat";

private HeartBeat (){
}
}

04) Processor

Now, I am going to create a generic processor class that is the most important part of this example. I will call it SinkProcessor. In this class, the Sink-Many interface will be used.

We could use EmiiterProcessor or DirectProcessor. But since the 3.5 version, those are deprecated.

Image Source: projectreactor.io

As it is mentioned in the project reactor documentation, I am going to use Sinks.many().multicast().onBackpressureBuffer().

public class SinkProcessor<T> {

private final Sinks.Many<T> sink;

public SinkProcessor() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
}

public void add(T t) {
this.sink.tryEmitNext(t);
}

public Flux<T> flux() {
return this.sink.asFlux();
}
}

Here, I have created two simple methods called add and flux. In the add method, it emits an object of the generic T class into the sink, and in the flux method, it is called the asFlux() method of the sink, which will return the Flux of T.

04) Service

In this example, it is necessary to have methods for creating and streaming. Hence, let’s create a service interface and its implementation class with those two methods.

public interface NotificationService {
Mono<Notification> createNotification(NotificationDto notificationDto);
Flux<Notification> streamNotifications();
}

For the service implementation class, it will inject the NotificationRepository interface and a SinkProcessor object will be initialized with the Notification class.

private final NotificationRepository notificationRepository;

private static final SinkProcessor<Notification> notificationSinkProcessor = new SinkProcessor<>();

The implementation of the createNotification method will be like below.

@Override
public Mono<Notification> createNotification(NotificationDto notificationDto) {
NotificationHelper.validateNotificationRequest(notificationDto);
return Mono.just(NotificationHelper.notificationDtoToNotificationEntity(notificationDto))
.flatMap(this.notificationRepository::save)
.doOnNext(notificationSinkProcessor::add)
.doOnError(throwable -> log.error("Error on saving notification, ", throwable));
}

As you can see, it accepts the NotificationDto object as the parameter. Then the methods of the NotificationHelper class have been used for validation and object mapping from notificationDto to the notification entity. As the next step, it saves the notification entity into the database using the save method of the repository.

Then adding the entity into the sink is the important part for the stream API because later we are going to retrieve that data in real time. That’s all we need in the create method.

The implementation of the streamNotifications method will be simple like below.

@Override
public Flux<Notification> streamNotifications() {
return this.notificationRepository.findAll()
.mergeWith(notificationSinkProcessor.flux());
}

It will read the existing data from the database and will be merged with the processor.

@Slf4j
@Service
public class NotificationServiceImpl implements NotificationService {

private final NotificationRepository notificationRepository;

private static final SinkProcessor<Notification> notificationSinkProcessor = new SinkProcessor<>();

public NotificationServiceImpl(NotificationRepository notificationRepository) {
this.notificationRepository = notificationRepository;
}

@Override
public Mono<Notification> createNotification(NotificationDto notificationDto) {
NotificationHelper.validateNotificationRequest(notificationDto);
return Mono.just(NotificationHelper.notificationDtoToNotificationEntity(notificationDto))
.flatMap(this.notificationRepository::save)
.doOnNext(notificationSinkProcessor::add)
.doOnError(throwable -> log.error("Error on saving notification, ", throwable));
}

@Override
public Flux<Notification> streamNotifications() {
return this.notificationRepository.findAll()
.mergeWith(notificationSinkProcessor.flux());
}
}

05) Controller

In the controller, it is needed to have end points for creating and streaming.

The important part is to mention the product type for the streaming API. It should be the test/event-stream.

@RestController
@RequestMapping("/notifications")
public class NotificationController {

private final NotificationService notificationService;

public NotificationController(NotificationService notificationService) {
this.notificationService = notificationService;
}

@PostMapping(path = "/create", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<Notification> createNotification(@RequestBody NotificationDto notificationDto) {
return this.notificationService.createNotification(notificationDto);
}

@GetMapping(path = "/streams", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> streamNotification(){
return Flux.interval(Duration.ofSeconds(10))
.map(i -> (Object) HeartBeat.INSTANCE)
.mergeWith(this.notificationService.streamNotifications());
}
}

As you can see in the stream method, it uses Flux.interval with a 10 second duration and it is mapped to the HeartBeat class. Then it is merged with the streaming method in the service. When it comes to testing, you will understand how it works.

I have followed a project structure like this.

project structure

06) Testing

First, let’s create some data using the create API.

create-data-using-postman

Then you can open your browser and try the stream API mentioned below.

stream-API-test-in-browser

Or you can test it on your command line.

curl -N --location --request GET 'http://localhost:9000/notifications/streams' \--data-raw ''
Stream-API-Test-In-cmd

Now we can test the stream API while adding some data from the create API. And see whether it appears to be newly added data in real-time from the stream API. By watching this, you should understand why there is a HeartBeat class.

Test-while-adding-data-in-real-time

I hope this will help you to understand and create Request-Stream APIs for your domain.

Yasas Ranawaka

--

--

No responses yet