Java Reactive with Project Reactor Essential Operators
For the project reactor, if you are a beginner, as an important step, you will have to get your hands dirty with some operators. From this, you can refer to 10 essential operators in reactive java with examples.
So, these are the operators that are going to be reviewed.
- map
- flatMap
- flatMapSequential
- switchIfEmpty
- defaultIfEmpty
- concat/concatWith
- merge/mergeWith
- mergeSequential
- zip
- Mono defer
Create a Java reactive Spring Boot application with Spring WebFlux if you want to try these (or just add reactor-core dependency and start an application). Then you can try these operators and learn when to use them.
01). map Operator
If you refer to the implementation of the map operator in Mono or Flux classes, you can see that the method expects a Function parameter. As a result, it applies a synchronous specified function to transform the item(s) emitted by a Mono or Flux into the expected item(s).Here, the important key words are “transform” and “synchronous”.
Refer the example below.
@Test
void testMapOperator() {
Flux<String> mapExample = Flux.just("one", "two", "three")
.map(string -> string.toUpperCase().concat(" EXAMPLE"))
.log();
StepVerifier.create(mapExample)
.expectNext("ONE EXAMPLE","TWO EXAMPLE","THREE EXAMPLE")
.expectComplete()
.verify();
}
02). flatMap Operator
Like the map operator, the flatMap operator also expects a Function parameter. But there are some other advantages and usages as well.
If we consider the Flux class, the faltMap mapper function is expected to transform items asynchronously into Publishers. Then it flattens the publishers into a single Flux through merging. And it is not guaranteed that the items are preserved in their original order.
Function<? super T, ? extends Publisher<? extends R>> mapper
This is the Function parameter that is expected from the Flux flatMap method. Now consider below examples for the parameters
Function<String, Publisher<String>> example1 = s -> Mono.just(s.toUpperCase().concat(" EXAMPLE"));Function<String, Publisher<String>> example2 = s -> Flux.just(s.split(""));
If you refer to the image below that was captured from the project reactor documentation, you can easily understand what was explained.
When we consider the Mono class, the function that is the parameter of the faltMap method is expected to transform an item asynchronously into another Mono item, which means it returns the value emitted by another Mono. And if you’re working with reactive programming, you’ll know that the Mono is a publisher that only emits one item.
Function<? super T, ? extends Mono<? extends R>> transformer
This is the Function parameter that is expected from the Mono flatMap method. Now if you see the below Function, you can understand the difference.
Function<String, Mono<String>> flatMap = s -> Mono.just(s.toUpperCase());
Here, important key points are “transform”, “asynchronous” and “Publisher”.
Now consider the example below for the flatMap operator.
@Test
void testFlatMapOperator() {
Flux<String> flatMapExample = Flux.just("one", "two")
.flatMap(s -> Flux.just(s.toUpperCase().split("")))
.log();
StepVerifier.create(flatMapExample)
.expectNext("O","N","E","T", "W","O")
.expectComplete()
.verify();
}
Even in this example, it emitted the items in the order we expected, but it is not guaranteed that original order.
03) flatMapSequential Operator
As the name implies, it maintains the original order in the final result.
So like flatMap, it transforms the items asynchronously into Publishers, and then flattens these publishers into a single Flux but preserves the order of their source element by queuing items received out of order. Hence, compared to flatMap, in flatMapSequential, the key point is the order.
04) switchIfEmpty Operator
This will be needed when you want to fallback to an alternative Publisher if the source producer is empty. Hence, it expects a publisher as a parameter.
Let’s see the example below and its results.
@Test
void testSwitchIfEmptyOperator() {
Flux<Object> switched = Flux.empty()
.switchIfEmpty(Mono.just("switched"))
.log();
StepVerifier.create(switched)
.expectSubscription()
.expectNext("switched")
.expectComplete()
.verify();
}
05) defaultIfEmpty Operator
This operator provides a default value if the source producer is completed without any data (empty).
So, what is the difference between switchIfEmpty and defaultIfEmpty?
The switchIfEmpty operator provides an alternative producer if the source producer is empty, but for the defaultIfEmpty operator, it emits a default item to the empty producer.
Now focus on the below example and its results for the defaultIfEmpty operator.
@Test
void testDefaultIfEmptyOperator() {
Flux<Object> defaultIfEmpty = Flux.empty()
.defaultIfEmpty("default")
.log();
StepVerifier.create(defaultIfEmpty)
.expectSubscription()
.expectNext("default")
.expectComplete()
.verify();
}
Now let’s consider the rest of the mentioned operators.
- concatWith/concat
- merge/mergeWith
- mergeSequential
- zip
These operators are used to combine publishers in different ways. Let’s consider them one by one.
06) concatWith/concat Operator
The concatWith operator is a non-static method while the concat operator is a static method of Flux. Using the concatWith operator, we can concatenate one publisher with another one, while using the static concat method, we can concatenate multiple publishers. Let’s consider the concat operator for examples since both are equivalent.
This operator subscribes to sources sequentially and waits for a source to finish before subscribing to the next source. It is clearly elaborated in the diagram below, which is captured from the project reactor documentation.
Consider the example below and its results.
@Test
void testConcatOperator() {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 =Flux.just("d","e");
Flux<String> concatFlux = Flux.concat(flux1, flux2).log();
StepVerifier.create(concatFlux)
.expectSubscription()
.expectNext("a", "b", "c","d","e")
.expectComplete()
.verify();
}
07) merge/mergeWith Operator
Unlike the concat operator, this operator subscribes to inner sources eagerly and it combines source publishers into an interleaved merged sequence.
Let’s consider the example below and its results.
@Test
void testMergeOperator() {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 =Flux.just("d","e");
Flux<String> mergedFlux = Flux.merge(flux1, flux2).log();
StepVerifier.create(mergedFlux)
.expectSubscription()
.expectNext("a", "b", "c","d","e")
.expectComplete()
.verify();
}
To see the actual difference between the concat and merge operators, we can add a delay for publishers like in the example.
@Test
void testMergeOperator() {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 =Flux.just("d","e");
Flux<String> mergedFlux = Flux.merge(flux1.delayElements(Duration.ofSeconds(2)), flux2).log();
StepVerifier.create(mergedFlux)
.expectSubscription()
.expectNext("a", "b", "c","d","e")
.expectComplete()
.verify();
}
After a delay was added, we can see that it emitted item “d” while we expected item “a” in the step verifier.
08) mergeSequential
This operator merges source publishers into an ordered merged sequence and, in the end, like concat, it preserves the original order. But unlike concat, the inner sources are subscribed to eagerly and, unlike merge, the emitted items are merged into the final sequence in subscription order.
@Test
void testMergeSeqOperator() {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 =Flux.just("d","e");
Flux<String> mergedFlux = Flux.mergeSequential(flux1.delayElements(Duration.ofSeconds(2)), flux2).log();
StepVerifier.create(mergedFlux)
.expectSubscription()
.expectNext("a", "b", "c","d","e")
.expectComplete()
.verify();
}
As we can see in the example, even if we added a delay duration for the first publisher, we still have the expected order in the results.
09) zip Operator
From this operator, we can zip multiple sources together by waiting for each source to emit one element. And it combines these elements and produces an output. The combining will then continue until one of the sources completes.
It has several overloaded methods that can be used in various situations.
Checkout the code line below,
Flux<Tuple2<String, String>> zip1 = Flux.zip(flux1, flux2);
It returns the flux of a tuple2 object. Then we can use the zipped publishers.
Flux<String> fluxString = zip1.map(tuple -> tuple.getT1() +"+"+ tuple.getT2());fluxString.subscribe(System.out::println);//--output
// a+d
// b+e
Then checkout the below code line which has the 3rd parameter as a BiFunction.
Flux<String> zip2 = Flux.zip(flux1, flux2, (a, b) -> a +"-"+ b);zip2.subscribe(System.out::println);//---output
// a-d
// b-e
10) Mono defer
Normally, we use the Mono.just () method to create a simple mono. But the Mono.just() method creates a new Mono that emits the specified item which is captured at instantiation time. Let’s refer to the example below.
Mono<Long> just = Mono.just(System.currentTimeMillis());just.subscribe(System.out::println);
Thread.sleep(1000);
just.subscribe(System.out::println);
This will print the same result each time.
Here, if we use the Mono.defer() method, it creates a Mono that emits the specified item which is captured at subscribed. Let’s refer to the example below.
Mono<Long> defer = Mono.defer(() -> Mono.just(System.currentTimeMillis()));defer.subscribe(System.out::println);
Thread.sleep(1000);
defer.subscribe(System.out::println);
Now this will print a different value each time the subscriber subscribes to it.
I hope now you understand these operators and when to use the proper one. While trying these, try to learn more details digging into the documentation and method implementation.
Yasas Ranawaka