This post will demonstrate how to setup a reactive stack with Spring Boot Webflux, Apache Kafka and Angular 8.
What we are building
The stack consists of the following components:
- Spring Boot/Webflux for implementing reactive RESTful web services
- Kafka as the message broker
- Angular frontend for receiving and handling server side events.
As an example, we will implement a hypothetical solution where a number of weather stations collect temperature data from various locations and send regular updates to the backend server via the message broker (Kafka). The server in turn updates the frontend with the information in real time.
Spring Boot/Webflux + Kafka
Project Setup & Configuration
Maven
Add the following dependencies to Maven
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
Webflux Configuration
Since we are using a separate frontend, we need to set up CORS filters in Spring Boot:
@Configuration public class CorsConfig implements WebFluxConfigurer { @Override public void addCorsMappings(CorsRegistry corsRegistry) { corsRegistry.addMapping("/**") .allowedOrigins("*") .maxAge(3600); } }
Spring Boot Kafka Configuration
Spring Boot provides auto configuration for Kafka. However, in order to send and receive messages with object payload, we need to configure Kafka to use JSON serializer and deserializer.
This post (see Resource [1]) describes in details how to do that. I include the Java config codes below for completeness:
package au.com.rleeblog.reactive.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; import au.com.rleeblog.reactive.event.WeatherInfoEvent; @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; } @Bean public ProducerFactory<String, WeatherInfoEvent> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "json"); return props; } @Bean public ConsumerFactory<String, WeatherInfoEvent> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(WeatherInfoEvent.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
Event model
WeatherInfoEvent
The class WeatherInfoEvent below is used to model the weather data sent by the weather stations. For simplicity, there is only one data field (temperature)
public class WeatherInfoEvent { private long stationId; private int temperature; // Getter and Setter methods ... }
WeatherInfoEventListener
We define this interface for handling WeatherInfoEvent
public interface WeatherInfoEventListener { void onData(WeatherInfoEvent event); void processComplete(); }
Kafka Message Producer
To simulate the messages Kafka receives from weather stations, we setup Spring Boot as a Kafka producer and add a scheduled job to send a new WeatherInfoEvent message to Kafka every 5 seconds
@Service public class WeatherInfoService { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate; public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) { logger.info(String.format("#### -> Producing message -> %s", message)); return this.kafkaTemplate.send(topic, message); } @Scheduled(fixedDelay = 5000) public void getWeatherInfoJob() throws IOException { logger.info("generate fake weather event"); // fake event WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30)); sendMessage("weather", event); } }
Kafka Message Consumer
We define the class WeatherInfoEventProcessor to consumer messages from Kafka topic
@Service public class WeatherInfoEventProcessor { private final Logger logger = LoggerFactory.getLogger(getClass()); private WeatherInfoEventListener listener; public void register(WeatherInfoEventListener listener) { this.listener = listener; } public void onEvent(WeatherInfoEvent event) { if (listener != null) { listener.onData(event); } } public void onComplete() { if (listener != null) { listener.processComplete(); } } @KafkaListener(topics = "weather", groupId = "group_id") public void consume(WeatherInfoEvent message) throws IOException { logger.info(String.format("#### -> Consumed message -> %s", message)); onEvent(message); } }
Note the method consume() is annotated with @KafkaListener to mark it as the method to invoke on receiving messages from Kafka. The method in turns invoke its registered WeatherInfoEventListener’s onData() method to pass on the message to Webflux stream
Webflux REST Web Service
Finally, we are ready to implement a reactive end point with Webflux. I use the familiar annotation model here. Webflux also supports a functional programming model.
@RestController // (1) Spring MVC annotation public class WeatherInfoController { @Autowired private WeatherInfoEventProcessor processor; private Flux<WeatherInfoEvent> bridge; public WeatherInfoController() { // (3) Broadcast to several subscribers this.bridge = createBridge().publish().autoConnect().cache(10).log(); } // (1) Spring MVC annotation @GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8") public Flux<WeatherInfoEvent> getWeatherInfo() { return bridge; } private Flux<WeatherInfoEvent> createBridge() { Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2) processor.register(new WeatherInfoEventListener() { @Override public void processComplete() { sink.complete(); } @Override public void onData(WeatherInfoEvent data) { sink.next(data); } }); }); return bridge; } }
Note
- The use of annotations @RestController and @GetMapping as in Spring MVC
- The use of Flux.create method to bridge the event listener into a Flux<WeatherInfoEvent>
- The use of Flux.publish to multicast a Flux to several subscribers.
Angular Frontend
Now that the backend is done, we can implement a component in Angular to consume the event stream from the reactive endpoint and display the information in the browser.
Message Model
// model.ts export interface IWeatherInfo { stationId: number; temperature: number; }
Weather Service
import { Injectable, NgZone } from '@angular/core'; import { Observable, BehaviorSubject } from 'rxjs'; import { environment } from 'src/environments/environment'; import { IWeatherInfo } from './models'; @Injectable({ providedIn: 'root' }) export class WeatherInfoService { private backendUrl: string; private eventSource: EventSource; private weatherDataSource: BehaviorSubject<Array<IWeatherInfo>> = new BehaviorSubject([]); weatherData = this.weatherDataSource.asObservable(); constructor(private zone: NgZone) { this.backendUrl = environment.backendUrl; } public startWeatherInfoEventSource(): void { let url = [this.backendUrl, 'weather'].join('/'); this.eventSource = new EventSource(url); this.eventSource.onmessage = (event) => { console.log('got event data', event['data']); const newArrays = [...this.weatherDataSource.value, JSON.parse(event['data'])]; this.zone.run(() => { this.weatherDataSource.next(newArrays); }) } this.eventSource.onerror = (error) => { this.zone.run( () => { // readyState === 0 (closed) means the remote source closed the connection, // so we can safely treat it as a normal situation. Another way of detecting the end of the stream // is to insert a special element in the stream of events, which the client can identify as the last one. if(this.eventSource.readyState === 0) { this.eventSource.close(); this.weatherDataSource.complete(); } else { this.weatherDataSource.error('EventSource error: ' + error); } }); } } public onClose() { this.eventSource.close(); this.weatherDataSource.complete(); } }
I use EventSource here to handle the server side events. This does not work for Microsoft IE or Edge as I understand.
Weather Component
@Component({ selector: 'app-weather', templateUrl: './weather.component.html', styleUrls: ['./weather.component.css'] }) export class WeatherComponent implements OnInit, OnDestroy { weatherInfo$: Observable<IWeatherInfo[]>; all: IWeatherInfo[]; constructor(private service: WeatherInfoService) { } ngOnInit() { this.service.startWeatherInfoEventSource(); this.weatherInfo$ = this.service.weatherData; } ngOnDestroy() { this.service.onClose(); } @HostListener('window:beforeunload', [ '$event' ]) unloadHandler(event) { console.log('unloadHandler'); this.service.onClose(); } }
The component kicks start the event source in the service layer when it is initialized to start receiving messages from the server. Note also it is important to close down the event source when done as in the ngOnDestroy() method.
Weather Component Template
Finally the html template. Note the user of async pipe.
// weather.component.html <mat-nav-list> <mat-list-item *ngFor="let info of weatherInfo$ | async"> Station Id: {{info.stationId}}, temperature: {{info.temperature}}C </mat-list-item> </mat-nav-list>
With all the servers started up, you should see the streaming of weather events in the browser like the screenshot below
It may also be useful to open up the dev console and see the event stream from the endpoint (/weather)
Useful Resources
- Spring Kafka – JSON Serializer Deserializer Example (click here)
- Reactor 3 Reference Guild (click here)
- Spring Reference Documentation – Web Reactive Stack