A reactive stack with Spring Boot, Kafka and Angular

This post will demonstrate how to setup a reactive stack with Spring Boot Webflux, Apache Kafka and Angular 8.

What we are building


The stack consists of the following components:

  1. Spring Boot/Webflux for implementing reactive RESTful web services
  2. Kafka as the message broker
  3. Angular frontend for receiving and handling server side events.

As an example, we will implement a hypothetical solution where a number of weather stations collect temperature data from various locations and send regular updates to the backend server via the message broker (Kafka). The server in turn updates the frontend with the information in real time.

Spring Boot/Webflux + Kafka

Project Setup & Configuration


Add the following dependencies to Maven


Webflux Configuration

Since we are using a separate frontend, we need to set up CORS filters in Spring Boot:

public class CorsConfig implements WebFluxConfigurer {
     public void addCorsMappings(CorsRegistry corsRegistry) {

Spring Boot Kafka Configuration

Spring Boot provides auto configuration for Kafka. However, in order to send and receive messages with object payload, we need to configure Kafka to use JSON serializer and deserializer.

This post (see Resource [1]) describes in details how to do that. I include the Java config codes below for completeness:

package au.com.rleeblog.reactive.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import au.com.rleeblog.reactive.event.WeatherInfoEvent;

public class KafkaConfig {

     private String bootstrapServers;

     public Map<String, Object> producerConfigs() {
          Map<String, Object> props = new HashMap<>();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
          return props;

     public ProducerFactory<String, WeatherInfoEvent> producerFactory() {
          return new DefaultKafkaProducerFactory<>(producerConfigs());

     public KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory());

     public Map<String, Object> consumerConfigs() {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
          return props;

     public ConsumerFactory<String, WeatherInfoEvent> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                 new JsonDeserializer<>(WeatherInfoEvent.class));

     public ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> factory =
               new ConcurrentKafkaListenerContainerFactory<>();
           return factory;


Event model


The class WeatherInfoEvent below is used to model the weather data sent by the weather stations. For simplicity, there is only one data field (temperature)

public class WeatherInfoEvent {
     private long stationId;
     private int temperature;

     // Getter and Setter methods


We define this interface for handling WeatherInfoEvent

public interface WeatherInfoEventListener {
    void onData(WeatherInfoEvent event);
    void processComplete();

Kafka Message Producer

To simulate the messages Kafka receives from weather stations, we setup Spring Boot as a Kafka producer and add a scheduled job to send a new WeatherInfoEvent message to Kafka every 5 seconds

public class WeatherInfoService {
     private final Logger logger = LoggerFactory.getLogger(getClass());

     private KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate;

     public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) {
          logger.info(String.format("#### -> Producing message -> %s", message));
          return this.kafkaTemplate.send(topic, message);

     @Scheduled(fixedDelay = 5000)
     public void getWeatherInfoJob() throws IOException {
          logger.info("generate fake weather event");
          // fake event
          WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30));
          sendMessage("weather", event);

Kafka Message Consumer

We define the class WeatherInfoEventProcessor to consumer messages from Kafka topic

public class WeatherInfoEventProcessor {

     private final Logger logger = LoggerFactory.getLogger(getClass());
     private WeatherInfoEventListener listener;

     public void register(WeatherInfoEventListener listener) {
          this.listener = listener;

     public void onEvent(WeatherInfoEvent event) {
          if (listener != null) {

     public void onComplete() {
         if (listener != null) {

     @KafkaListener(topics = "weather", groupId = "group_id")
     public void consume(WeatherInfoEvent message) throws IOException {
          logger.info(String.format("#### -> Consumed message -> %s", message));

Note the method consume() is annotated with @KafkaListener to mark it as the method to invoke on receiving messages from Kafka. The method in turns invoke its registered WeatherInfoEventListener’s onData() method to pass on the message to Webflux stream

Webflux REST Web Service

Finally, we are ready to implement a reactive end point with Webflux. I use the familiar annotation model here. Webflux also supports a functional programming model.

@RestController  // (1) Spring MVC annotation
public class WeatherInfoController {

     private WeatherInfoEventProcessor processor;

     private Flux<WeatherInfoEvent> bridge;

     public WeatherInfoController() {
          // (3) Broadcast to several subscribers
          this.bridge = createBridge().publish().autoConnect().cache(10).log();

     // (1) Spring MVC annotation
     @GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8")
     public Flux<WeatherInfoEvent> getWeatherInfo() {
          return bridge;

     private Flux<WeatherInfoEvent> createBridge() {
          Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2)
               processor.register(new WeatherInfoEventListener() {

                   public void processComplete() {

                   public void onData(WeatherInfoEvent data) {
          return bridge;


  1. The use of annotations @RestController and @GetMapping as in Spring MVC
  2. The use of Flux.create method to bridge the event listener into a Flux<WeatherInfoEvent>
  3. The use of Flux.publish to multicast a Flux to several subscribers.

Angular Frontend

Now that the backend is done, we can implement a component in Angular to consume the event stream from the reactive endpoint and display the information in the browser.

Message Model

// model.ts
export interface IWeatherInfo {
    stationId: number;
    temperature: number;

Weather Service

import { Injectable, NgZone } from '@angular/core';
import { Observable, BehaviorSubject } from 'rxjs';
import { environment } from 'src/environments/environment';
import { IWeatherInfo } from './models';

  providedIn: 'root'
export class WeatherInfoService {
    private backendUrl: string;
    private eventSource: EventSource;
    private weatherDataSource: BehaviorSubject<Array<IWeatherInfo>> = new BehaviorSubject([]);

    weatherData = this.weatherDataSource.asObservable();  

    constructor(private zone: NgZone) {
      this.backendUrl = environment.backendUrl;

    public startWeatherInfoEventSource(): void {
      let url = [this.backendUrl, 'weather'].join('/');

      this.eventSource = new EventSource(url);
      this.eventSource.onmessage = (event) => {

        console.log('got event data', event['data']);
        const newArrays = [...this.weatherDataSource.value, JSON.parse(event['data'])];

        this.zone.run(() => {


      this.eventSource.onerror = (error) => {

        this.zone.run( () => {
          // readyState === 0 (closed) means the remote source closed the connection,
          // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
          // is to insert a special element in the stream of events, which the client can identify as the last one.
          if(this.eventSource.readyState === 0) {
          } else {
            this.weatherDataSource.error('EventSource error: ' + error);

    public onClose() {



I use EventSource here to handle the server side events. This does not work for Microsoft IE or Edge as I understand.

Weather Component

  selector: 'app-weather',
  templateUrl: './weather.component.html',
  styleUrls: ['./weather.component.css']
export class WeatherComponent implements OnInit, OnDestroy {

  weatherInfo$: Observable<IWeatherInfo[]>;
  all: IWeatherInfo[];

  constructor(private service: WeatherInfoService) { }
  ngOnInit() {
    this.weatherInfo$ = this.service.weatherData;

  ngOnDestroy() {

  @HostListener('window:beforeunload', [ '$event' ])
  unloadHandler(event) {


The component kicks start the event source in the service layer when it is initialized to start receiving messages from the server. Note also it is important to close down the event source when done as in the ngOnDestroy() method.

Weather Component Template

Finally the html template. Note the user of async pipe.

// weather.component.html
  <mat-list-item *ngFor="let info of weatherInfo$ | async">
      Station Id: {{info.stationId}}, temperature: {{info.temperature}}C

With all the servers started up, you should see the streaming of weather events in the browser like the screenshot below


It may also be useful to open up the dev console and see the event stream from the endpoint (/weather)


Useful Resources

  1. Spring Kafka – JSON Serializer Deserializer Example (click here)
  2. Reactor 3 Reference Guild (click here)
  3. Spring Reference Documentation – Web Reactive Stack