Start Spring boot Microservices Kafka PostgreSQL and Mongo with Reactive

Yasas Ranawaka
15 min readAug 14, 2022

--

As a beginner, let’s try to design a small microservice architecture with Spring Boot and few technologies that are commonly used.

If you haven’t read my previous articles and stories, you can find those in here, https://www.linkedin.com/in/yasasr/details/featured/

Before reading this, you should know what microservice architecture is. And what are the pros and cons? The goal is not to discuss those.

The main goal is to have some hands-on experience in tech related to spring boot and microservices. And to get motivation for a beginner who is willing to learn while enjoying what you are doing. While trying this, you will have to learn some details on your own. Because in reality, we have to learn while doing the implementation. So if you are a beginner and excited to work and see what is happening, try this.

These are the technologies and design patterns which we are going to use in this.

  • Spring Boot
  • Project reactor
  • Spring cloud eureka for service registry
  • Spring cloud gateway for API gateway
  • Resilience4J for Circuit Breaker
  • Kafka
  • PostgreSQL
  • Mongo

Let’s consider the below diagram that shows the overall idea of what we are going to implement with the above tech and patterns.

main example diagram scenario

So this is an example of architecture that can simply be identified by anyone who is excited to learn. Then you can create and design your own design according to the required domain. This can help you work in distributed systems and event-driven systems.

Let’s begin.

As mentioned in the diagram, You can see that,

I). Service Registry

If you don’t know what is a service registry and why it’s needed, https://www.baeldung.com/cs/service-discovery-microservices

II). API Gateway that has a Circuit-breaker.

If you don’t know what is a circuit-breaker and why it’s needed, https://techblog.constantcontact.com/software-development/circuit-breakers-and-microservices/

III). Kafka

IV). Order Connector service

V). Order service

VI). PostgreSQL database

VII). Order event service

VIII). Mongo database

We can create spring-boot applications in spring-initializer by specifying related dependencies. I’m going to use spring-boot-starter-webflux, which allows us to use project reactors for reactive Java.

So, this webflux dependency will be used in all services.

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

Let’s begin with the service-registry.

01) Service Registry

For service registry, eureka-server will be used which is provided by netflix.

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>

pom.xml

<artifactId>order-service-registry</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>order-service-registry</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>15</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

In the application.yml file, eureka client basic details should be mentioned like below.

spring:
application:
name: order-service-registry
server:
port: 8761
eureka:
client:
register-with-eureka: false
fetch-registry: false
server:
wait-time-in-ms-when-sync-empty: 0

In the spring boot application class, @EnableEurekaServer annotation should be specified.

@EnableEurekaServer
@SpringBootApplication
public class OrderServiceRegistryApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceRegistryApplication.class, args);
}
}

Now You can start the service and then see the localhost:8761 url in the browser. (default port is also 8761. Mentioned in the properties file)

service-registry-1

02) API Gateway

For the API gateway, mainly spring-cloud-starter-gateway dependency was used. Apart from that, we need a Eureka client as the API-gateway is also another microservice which needs to be registered and also a resilience4j dependency is needed for the circuit-breaker.

<properties>
<java.version>15</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
<maven.compiler.source>15</maven.compiler.source>
<maven.compiler.target>15</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

API gateway configuration can be defined in the application.yml file.

spring:
application:
name: order-service-api-gateway
cloud:
gateway:
routes:
- id: order-service-route
uri: lb://order-service
predicates:
- Path=/orders/*
- Method=GET,POST,PUT,DELETE
- id: order-event-service-route
uri: lb://order-event-service
predicates:
- Path=/order-events/*
- Method=GET,POST,PUT,DELETE
- id: order-connector-service-route
uri: lb://order-connector-service
predicates:
- Path=/order-connector/*
- Method=GET,POST,PUT,DELETE
filters:
- name: CircuitBreaker
args:
name: order-connector-service-fallback
fallbackUri: forward:/fallback/order-connector-service

server:
port: 8222

Here,three routes were mentioned for three microservices that are going to be discussed today. Furthermore, a circuit-breaker was specified for the order-connector-service with a fall-back uri. Because in today’s example, order-connector-service will be connected with the outside and there will not be any microservice communications. If there is more communication between microservices, we can implement circuit-breakers at the microservice level as well.

So there is a need to implement a fallback mapping.

@RestController
@RequestMapping("/fallback")
public class FallBackController {

@GetMapping("/order-connector-service")
public Mono<String> orderServiceFallBack(ServerHttpResponse response) {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
return Mono.just("Services are facing a problem. Please Contact System Admin");
}
}

Then in the spring boot application class, @EnableEurekaClient annotation should be specified.

@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceApiGatewayApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApiGatewayApplication.class, args);
}
}

Now if you run the application with service registry, API-gateway service should be displayed in the application table in eureka server like below.

03) Kafka

To install and run Kafka in your local machine, you can refer to the Kafka document which simply mentioned how to do it. https://kafka.apache.org/quickstart

Notes: If you are using a windows machine, you have to edit the properties below.

  • dataDir path in zookeeper.properties
  • log.gits path in server.properties

So for this example, i will create a kafka topic called “order-listener”. Then what is the plan ?

Main three micro-services

So in this example, I am going to create three microservices.

Let’s assume a scenario that receives orders from the client side and we have to maintain order details and its events. Then there can be requirements to store orders and generate reports for the past few months and to get a live analysis category wise or country wise into a dashboard.

I) order-connector-service

The main responsibility is to obtain order details from outs-side(client side) and produce the details into the Kafka-topic.

From here, we can preserve order details by implementing validations and business logic, if needed. Apart from that, we can extract order details, which only need to be processed if they are needed.

II) order-service

The main responsibility is to store necessary order details, which can be used in other/future work.

In this example, an order status or any other field can be updated at any time. So the order entity will be updated for a business id. Apart from that, we can maintain special business logic or validations here as well. To store the details, a PostgreSQL database will be used.

II) order-event-service

The main responsibility is to store the events of orders. To store the details, a Mongo database will be used.

To demonstrate, I just mentioned order details. Let’s assume here there can be a few order statuses like START, READY, SENT, and CANCELED. Also, there can be a few item categories as well, like ELECTRIC, WOOD, and TOOL.

{
"orderStatus": "START",
"userName": "yasasr",
"email": "ranawaka.y@gmail.com",
"itemCategory": "ELECTRIC",
"itemCode": "ELC10001",
"price": 15,
"orderedDateTime": 1659855715000,
"countryCode": "LK",
"countryName": "Sri Lanka",
"state": "Colombo",
"city": "Colombo"
}

04) Order connector service

We will need webflux, eureka client, gosn, lombok, and spring kafka dependencies.

<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

In the application.yml file, we have to mention configurations related to kafka.

spring:
application:
name: order-connector-service
kafka:
producer:
bootstrap-servers: DESKTOP-7JUA3TN:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

server:
port: 8003

kafka:
topic: order-listener

In this microservice, I’m going to maintain the below project structure.

Then to get a Kafka topic we can create a configuration bean class

@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic topic(){
return TopicBuilder.name("order-listener")
.partitions(3)
.build();
}
}

Also we can create basic domain and service implementations in a proper way. OrderSynceRequest class that needs to sync the data from client side.

@Builder
@Getter
public class OrderSyncRequest {
private String orderStatus;
private String userName;
private String email;
private String itemCategory;
private String itemCode;
private double price;
private long orderedDateTime;
private String countryCode;
private String countryName;
private String state;
private String city;

}

Order sync service,

public interface OrderSyncService {
void syncAndProduceOrderRequest(OrderSyncRequest orderSyncRequest);
}

and implementaion,

@Slf4j
@Service
public class OrderSyncServiceImpl implements OrderSyncService {
@Value("${kafka.topic:order-listener}")
private String topic;
private final KafkaTemplate<String, String> kafkaTemplate;

public OrderSyncServiceImpl(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

@Override
public void syncAndProduceOrderRequest(OrderSyncRequest orderSyncRequest) {
Gson gson = new Gson();
String order = gson.toJson(orderSyncRequest);
if (validateOrderSyncRequest(orderSyncRequest)) {
kafkaTemplate.send(topic, order);
}
}

private boolean validateOrderSyncRequest(OrderSyncRequest orderSyncRequest) {
if (orderSyncRequest.getUserName() == null || orderSyncRequest.getUserName().isBlank()) {
return false;
}
if (orderSyncRequest.getEmail() == null || orderSyncRequest.getEmail().isBlank()) {
return false;
}
if (orderSyncRequest.getItemCode() == null || orderSyncRequest.getItemCode().isBlank()) {
return false;
}
if (orderSyncRequest.getOrderStatus() == null || orderSyncRequest.getOrderStatus().isBlank()) {
return false;
}
return true;
}
}

If you refer to the below method, it’s just receiving the data and producing it into the topic after validations.

@Override
public void syncAndProduceOrderRequest(OrderSyncRequest orderSyncRequest) {
Gson gson = new Gson();
String order = gson.toJson(orderSyncRequest);
if (validateOrderSyncRequest(orderSyncRequest)) {
kafkaTemplate.send(topic, order);
}
}

Order connector controller class,

@RestController
@RequestMapping("/order-connector")
public class OrderConnectorController {
@Autowired
OrderSyncService orderSyncService;

@GetMapping("/welcome")
public Mono<String> welcomeOrderEvents(){
return Mono.just("Welcome To Order Connector Service");
}

@PostMapping("/sync")
public Mono<String> orderSync(@RequestBody OrderSyncRequest orderSyncRequest, ServerHttpResponse response) {
response.setStatusCode(HttpStatus.OK);
orderSyncService.syncAndProduceOrderRequest(orderSyncRequest);
return Mono.just("Ok");
}
}

The @EnableDiscoveryClient annotation should be specified in the main class as previuos.

@EnableDiscoveryClient
@SpringBootApplication
public class OrderConnectorServiceApplication {

public static void main(String[] args) {
SpringApplication.run(OrderConnectorServiceApplication.class, args);
}
}

Now you can run the order-connector-service as well and check in with the Eureka server to ensure that the new service will be registered.

Up to now, the implementation can be tested by pushing the data into the “/order-connector/sync” path, and we can just listen to the topic by using a command prompt.

Notes: we can listen to a topic by using the below command.

.\bin\windows\kafka-console-consumer.bat — topic order-listener — from-beginning — bootstrap-server localhost:9092

If you refer to https://kafka.apache.org/quickstart , it can be found.

05) Order service with PostgreSQL

We will need webflux, eureka client, gosn, lombok, spring kafka, postgresql, r2dbc-postgesql, data-r2dbc and dependencies.

Since we are using reactive with webflux, we need to use R2DBC with PostgreSQL.

<properties>
<java.version>15</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.11.RELEASE</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

In the application.yml file, we have to mention configurations related to kafka and the database.

spring:
application:
name: order-service
kafka:
consumer:
bootstrap-servers: DESKTOP-7JUA3TN:9092
group-id: orderGroup1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
r2dbc:
url: r2dbc:postgresql://localhost/order-service?schema=public
username: postgres
password: 12345
server:
port: 8001

In order-service, I’m going to maintain the below project structure.

order-service project structure

Let’s create OrderDto

@Builder
@Data
public class OrderDto {
private String orderStatus;
private String userName;
private String email;
private String itemCode;
private String itemCategory;
private double price;
private long orderedDateTime;
private String countryCode;
private String countryName;
private String state;
private String city;
}

and OrderEntity classes

@Builder
@Data
@Table(name = "OrderEntity")
public class OrderEntity {
@Id
private UUID tid;
private String bid;
private String orderStatus;
private String userName;
private String email;
private String itemCode;
private String itemCategory;
private double price;
private long orderedDateTime;
private String countryCode;
private String countryName;
private String state;
private String city;
private long createdDateTime;
private long lastUpdatedDateTime;
}

In this case, the table should have been created in the database. We can create the table according to OrderEntity. Here is an example.

CREATE TABLE IF NOT EXISTS public.orderentity
(
tid uuid NOT NULL DEFAULT gen_random_uuid(),
bid character varying(200) COLLATE pg_catalog."default" NOT NULL,
order_status character varying(200) COLLATE pg_catalog."default",
user_name character varying(200) COLLATE pg_catalog."default",
email character varying(200) COLLATE pg_catalog."default",
item_code character varying(200) COLLATE pg_catalog."default",
item_category character varying(200) COLLATE pg_catalog."default",
price double precision,
ordered_date_time bigint,
country_code character varying(200) COLLATE pg_catalog."default",
country_name character varying(200) COLLATE pg_catalog."default",
state character varying(200) COLLATE pg_catalog."default",
city character varying COLLATE pg_catalog."default",
created_date_time bigint,
last_updated_date_time bigint,
CONSTRAINT orderentity_pkey PRIMARY KEY (tid)
)

The repository interface,

@Repository
public interface OrderRepository extends ReactiveCrudRepository<OrderEntity, UUID> {
@Query("SELECT * FROM orderentity WHERE bid = :bId")
Mono<OrderEntity> findByBId(String bId);
}

I have added the findByBId method to find an order by its business id according to my entity design.

In a util class, just specify the object mapping implementation for the order entity.

public class OrderObjectUtil {

public static OrderEntity mapOrderDataToOrderEntity(OrderDto orderData) {
return OrderEntity.builder()
.bid(getBidToOrderEntity(orderData))
.orderStatus(orderData.getOrderStatus())
.userName(orderData.getUserName())
.email(orderData.getEmail())
.itemCode(orderData.getItemCode())
.itemCategory(orderData.getItemCategory())
.price(orderData.getPrice())
.orderedDateTime(orderData.getOrderedDateTime())
.countryCode(orderData.getCountryCode())
.countryName(orderData.getCountryName())
.state(orderData.getState())
.city(orderData.getCity())
.createdDateTime(System.currentTimeMillis())
.lastUpdatedDateTime(System.currentTimeMillis())
.build();
}

private static String getBidToOrderEntity(OrderDto orderDto) {
return "id_" + orderDto.getItemCode() + "_" + orderDto.getUserName() + "_" + orderDto.getOrderedDateTime();
}
}

Then let’s create Service interface and implementation,

public interface OrderService {

Mono<OrderEntity> createOrUpdateOrder(OrderEntity orderEntity);

Flux<OrderEntity> streamOrders();
}
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {

private static final String TOPIC = "order-listener";
private static final String GROUP_ID = "orderGroup1";

private final OrderRepository orderRepository;

public OrderServiceImpl(OrderRepository orderRepository) {
this.orderRepository = orderRepository;
}

@KafkaListener(
topics = TOPIC,
groupId = GROUP_ID
)
public void consumeOrder(String eventMessage) {
log.info("Order Message in order-service: " + eventMessage);
Gson gson = new Gson();
OrderDto orderData = gson.fromJson(eventMessage, OrderDto.class);
log.info("OrderData in order-service: " + orderData);
if (validateOrderData(orderData)) {
this.createOrUpdateOrder(OrderObjectUtil.mapOrderDataToOrderEntity(orderData)).subscribe();
}
}

@Override
public Mono<OrderEntity> createOrUpdateOrder(OrderEntity orderEntity) {
return orderRepository.findByBId(orderEntity.getBid())
.flatMap(exist -> {
log.info("Order found : " + exist);
orderEntity.setTid(exist.getTid());
orderEntity.setCreatedDateTime(exist.getCreatedDateTime());
orderEntity.setLastUpdatedDateTime(System.currentTimeMillis());
return orderRepository.save(orderEntity)
.doOnError(throwable -> log.error("Error in update order : ", throwable));
})
.switchIfEmpty(Mono.defer(() -> orderRepository.save(orderEntity)))
.doOnError(throwable -> log.error("Error in save order : ", throwable));
}

@Override
public Flux<OrderEntity> streamOrders() {
return orderRepository.findAll();
}

private boolean validateOrderData(OrderDto orderDto) {
if (orderDto.getUserName() == null || orderDto.getUserName().isBlank()) {
return false;
}
if (orderDto.getEmail() == null || orderDto.getEmail().isBlank()) {
return false;
}
if (orderDto.getItemCode() == null || orderDto.getItemCode().isBlank()) {
return false;
}
if (orderDto.getOrderStatus() == null || orderDto.getOrderStatus().isBlank()) {
return false;
}
if (orderDto.getOrderedDateTime() == 0) {
orderDto.setOrderedDateTime(System.currentTimeMillis());
}
return true;
}
}

Here, there is a Kafka listener annotated with @KafkaListener which consumes data from the “order-listener” topic. Also, there is a method to create or update order data in the database.

We can create a controller for end points that can be used to test the flow.

@RestController
@RequestMapping("/orders")
public class OrderServiceController {
@Autowired
private OrderService orderService;

@GetMapping("/welcome")
public Mono<String> welcomeOrderEvents(){
return Mono.just("Welcome To Order Service");
}

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Object> streamOrders(){
return Flux.interval(Duration.ofSeconds(5))
.map(i-> (Object) HBM.INSTANCE)
.mergeWith(orderService.streamOrders())
.doOnError(Throwable::printStackTrace);
}
}

The @EnableDiscoveryClient annotation should be specified in the main class as previously. Apart from that, it is good to specify the @EnableR2dbcRepositories annotation to active, reactive relational repositories using R2DBC.

@EnableR2dbcRepositories
@EnableDiscoveryClient
@SpringBootApplication
public class OrderServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrderServiceApplication.class, args);
}
}

At this point, we can test the order-service by checking data in the table after we push the data into the order-connector-service.

order-service db tested

06) Order event service with Mongo

Order-event-service is most like order-service except it stores every event and the database.

We will need webflux, eureka client, gosn, lombok, spring kafka, postgresql, and mongodb-reactive dependencies.

Since we are using reactive with webflux, we need to use mongodb-reactive.

<properties>
<java.version>15</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

In the application.yml file, we have to mention configurations related to kafka and the database.

spring:
application:
name: order-event-service
kafka:
consumer:
bootstrap-servers: DESKTOP-7JUA3TN:9092
group-id: orderGroup2
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

data:
mongodb:
database: order-event-service
host: localhost
port: 27017

server:
port: 8002

In order-event service, I’m going to maintain the below project structure same as order-service.

order-event-service project structure

OrderEventDto and OrderEventEntity classes can be created like this.

@Builder
@Setter
@Getter
public class OrderEventDto {
private String orderStatus;
private String userName;
private String email;
private String itemCode;
private String itemCategory;
private double price;
private long orderedDateTime;
private String countryCode;
private String countryName;
private String state;
private String city;
}
@Builder
@Document(collation = "OrderEvent")
public class OrderEvent {
@Id
private String id;
private String bid;
private String orderStatus;
private String userName;
private String email;
private String itemCode;
private String itemCategory;
private double price;
private long orderedDateTime;
private String countryCode;
private String countryName;
private String state;
private String city;
private long createdDateTime;
}

OrderEventRepository interface,

@Repository
public interface OrderEventRepository extends ReactiveMongoRepository<OrderEvent, String> {
}

Order event service

public interface OrderEventService {
Mono<OrderEvent> createOrderEvent(OrderEvent orderEvent);
}

and implementations,

@Slf4j
@Service
public class OrderEventServiceImpl implements OrderEventService {

private static final String TOPIC = "order-listener";
private static final String GROUP_ID = "orderGroup2";

private final OrderEventRepository orderEventRepository;

public OrderEventServiceImpl(OrderEventRepository orderEventRepository) {
this.orderEventRepository = orderEventRepository;
}

@KafkaListener(
topics = TOPIC,
groupId = GROUP_ID
)
public void consumeOrder(String eventMessage) {
log.info("Order Message in order-event-service: " + eventMessage);
Gson gson = new Gson();
OrderEventDto orderData = gson.fromJson(eventMessage, OrderEventDto.class);
log.info("OrderEventData in order-event-service: " + orderData);
if (validateOrderData(orderData)) {
this.createOrderEvent(OrderEventObjectUtil.mapOrderEventToOrderEventEntity(orderData)).subscribe();
}
}

@Override
public Mono<OrderEvent> createOrderEvent(OrderEvent orderEvent) {
log.info("Order event to be saved : " + orderEvent);
return orderEventRepository.save(orderEvent)
.doOnError(throwable -> log.error("Error in saving : ", throwable));
}

private boolean validateOrderData(OrderEventDto orderDto) {
if (orderDto.getUserName() == null || orderDto.getUserName().isBlank()) {
return false;
}
if (orderDto.getEmail() == null || orderDto.getEmail().isBlank()) {
return false;
}
if (orderDto.getItemCode() == null || orderDto.getItemCode().isBlank()) {
return false;
}
if (orderDto.getOrderStatus() == null || orderDto.getOrderStatus().isBlank()) {
return false;
}
if (orderDto.getOrderedDateTime() == 0) {
orderDto.setOrderedDateTime(System.currentTimeMillis());
}
return true;
}
}

Here, there is a kafka listener annotated with @KafkaListener which consumes data from the “order-listener” topic the same as order-service.

The @EnableDiscoveryClient annotation should be specified in the main class as previously. Apart from that, it is good to specify the @EnableReactiveMongoRepositories annotation to enable reactive mongodb.

@EnableReactiveMongoRepositories
@EnableDiscoveryClient
@SpringBootApplication
public class OrderEventServiceApplication {

public static void main(String[] args) {
SpringApplication.run(OrderEventServiceApplication.class, args);
}
}

Then we can run order-event-service as well.

order-event-srevice db tested

Now referring to the below diagram again.

main example diagram scenario

We have implemented the main three microservices (order-connector-service, order-service, and order-event-service) with an API-gateway and service-registry.

Furthermore, you can think and design a way to implement REST APIs to get reports and get event stream values.

Yasas Ranawaka.

--

--

No responses yet