A reactive stack with Spring Boot, Kafka and Angular

This post will demonstrate how to setup a reactive stack with Spring Boot Webflux, Apache Kafka and Angular 8.

What we are building

springBootKafka

The stack consists of the following components:

  1. Spring Boot/Webflux for implementing reactive RESTful web services
  2. Kafka as the message broker
  3. Angular frontend for receiving and handling server side events.

As an example, we will implement a hypothetical solution where a number of weather stations collect temperature data from various locations and send regular updates to the backend server via the message broker (Kafka). The server in turn updates the frontend with the information in real time.

Spring Boot/Webflux + Kafka

Project Setup & Configuration

Maven

Add the following dependencies to Maven

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
</dependency>

Webflux Configuration

Since we are using a separate frontend, we need to set up CORS filters in Spring Boot:

@Configuration
public class CorsConfig implements WebFluxConfigurer {
     @Override
     public void addCorsMappings(CorsRegistry corsRegistry) {
          corsRegistry.addMapping("/**")
          .allowedOrigins("*")
          .maxAge(3600);
     }
}

Spring Boot Kafka Configuration

Spring Boot provides auto configuration for Kafka. However, in order to send and receive messages with object payload, we need to configure Kafka to use JSON serializer and deserializer.

This post (see Resource [1]) describes in details how to do that. I include the Java config codes below for completeness:

package au.com.rleeblog.reactive.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import au.com.rleeblog.reactive.event.WeatherInfoEvent;

@Configuration
public class KafkaConfig {

     @Value("${spring.kafka.bootstrap-servers}")
     private String bootstrapServers;

     @Bean
     public Map<String, Object> producerConfigs() {
          Map<String, Object> props = new HashMap<>();
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
          return props;
     }

     @Bean
     public ProducerFactory<String, WeatherInfoEvent> producerFactory() {
          return new DefaultKafkaProducerFactory<>(producerConfigs());
     }

     @Bean
     public KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate() {
          return new KafkaTemplate<>(producerFactory());
     }

     @Bean
     public Map<String, Object> consumerConfigs() {
          Map<String, Object> props = new HashMap<>();
          props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
          props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
          props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
          return props;
     }

     @Bean
     public ConsumerFactory<String, WeatherInfoEvent> consumerFactory() {
          return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
                 new JsonDeserializer<>(WeatherInfoEvent.class));
     }

     @Bean
     public ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, WeatherInfoEvent> factory =
               new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           return factory;
     }

}

Event model

WeatherInfoEvent

The class WeatherInfoEvent below is used to model the weather data sent by the weather stations. For simplicity, there is only one data field (temperature)

public class WeatherInfoEvent {
     private long stationId;
     private int temperature;

     // Getter and Setter methods
     ...
}

WeatherInfoEventListener

We define this interface for handling WeatherInfoEvent

public interface WeatherInfoEventListener {
    void onData(WeatherInfoEvent event);
    void processComplete();
}

Kafka Message Producer

To simulate the messages Kafka receives from weather stations, we setup Spring Boot as a Kafka producer and add a scheduled job to send a new WeatherInfoEvent message to Kafka every 5 seconds

@Service
public class WeatherInfoService {
     private final Logger logger = LoggerFactory.getLogger(getClass());

     @Autowired
     private KafkaTemplate<String, WeatherInfoEvent> kafkaTemplate;

     public ListenableFuture<SendResult<String, WeatherInfoEvent>> sendMessage(String topic, WeatherInfoEvent message) {
          logger.info(String.format("#### -> Producing message -> %s", message));
          return this.kafkaTemplate.send(topic, message);
     }

     @Scheduled(fixedDelay = 5000)
     public void getWeatherInfoJob() throws IOException {
          logger.info("generate fake weather event");
          // fake event
          WeatherInfoEvent event = new WeatherInfoEvent(RandomUtils.nextLong(0, 100), RandomUtils.nextInt(16, 30));
          sendMessage("weather", event);
     }
}

Kafka Message Consumer

We define the class WeatherInfoEventProcessor to consumer messages from Kafka topic

@Service
public class WeatherInfoEventProcessor {

     private final Logger logger = LoggerFactory.getLogger(getClass());
     private WeatherInfoEventListener listener;

     public void register(WeatherInfoEventListener listener) {
          this.listener = listener;
     }

     public void onEvent(WeatherInfoEvent event) {
          if (listener != null) {
               listener.onData(event);
          }
      }

     public void onComplete() {
         if (listener != null) {
              listener.processComplete();
         }
     }

     @KafkaListener(topics = "weather", groupId = "group_id")
     public void consume(WeatherInfoEvent message) throws IOException {
          logger.info(String.format("#### -> Consumed message -> %s", message));
          onEvent(message);
     }
}

Note the method consume() is annotated with @KafkaListener to mark it as the method to invoke on receiving messages from Kafka. The method in turns invoke its registered WeatherInfoEventListener’s onData() method to pass on the message to Webflux stream

Webflux REST Web Service

Finally, we are ready to implement a reactive end point with Webflux. I use the familiar annotation model here. Webflux also supports a functional programming model.

@RestController  // (1) Spring MVC annotation
public class WeatherInfoController {

     @Autowired
     private WeatherInfoEventProcessor processor;

     private Flux<WeatherInfoEvent> bridge;

     public WeatherInfoController() {
          // (3) Broadcast to several subscribers
          this.bridge = createBridge().publish().autoConnect().cache(10).log();
     }

     // (1) Spring MVC annotation
     @GetMapping(value = "/weather", produces = "text/event-stream;charset=UTF-8")
     public Flux<WeatherInfoEvent> getWeatherInfo() {
          return bridge;
     }

     private Flux<WeatherInfoEvent> createBridge() {
          Flux<WeatherInfoEvent> bridge = Flux.create(sink -> { // (2)
               processor.register(new WeatherInfoEventListener() {

                   @Override
                   public void processComplete() {
                       sink.complete();
                   }

                   @Override
                   public void onData(WeatherInfoEvent data) {
                       sink.next(data);
                   }
                });
          });
          return bridge;
     }
}

Note

  1. The use of annotations @RestController and @GetMapping as in Spring MVC
  2. The use of Flux.create method to bridge the event listener into a Flux<WeatherInfoEvent>
  3. The use of Flux.publish to multicast a Flux to several subscribers.

Angular Frontend

Now that the backend is done, we can implement a component in Angular to consume the event stream from the reactive endpoint and display the information in the browser.

Message Model

// model.ts
export interface IWeatherInfo {
    stationId: number;
    temperature: number;
}

Weather Service

import { Injectable, NgZone } from '@angular/core';
import { Observable, BehaviorSubject } from 'rxjs';
import { environment } from 'src/environments/environment';
import { IWeatherInfo } from './models';

@Injectable({
  providedIn: 'root'
})
export class WeatherInfoService {
    private backendUrl: string;
    private eventSource: EventSource;
    private weatherDataSource: BehaviorSubject<Array<IWeatherInfo>> = new BehaviorSubject([]);

    weatherData = this.weatherDataSource.asObservable();  

    constructor(private zone: NgZone) {
      this.backendUrl = environment.backendUrl;
    } 

    public startWeatherInfoEventSource(): void {
      let url = [this.backendUrl, 'weather'].join('/');

      this.eventSource = new EventSource(url);
      this.eventSource.onmessage = (event) => {

        console.log('got event data', event['data']);
        const newArrays = [...this.weatherDataSource.value, JSON.parse(event['data'])];

        this.zone.run(() => {
          this.weatherDataSource.next(newArrays);
        })

      }

      this.eventSource.onerror = (error) => {

        this.zone.run( () => {
          // readyState === 0 (closed) means the remote source closed the connection,
          // so we can safely treat it as a normal situation. Another way of detecting the end of the stream
          // is to insert a special element in the stream of events, which the client can identify as the last one.
          if(this.eventSource.readyState === 0) {
            this.eventSource.close();
            this.weatherDataSource.complete();
          } else {
            this.weatherDataSource.error('EventSource error: ' + error);
          }
        });
      }
    }

    public onClose() {
      this.eventSource.close();
      this.weatherDataSource.complete();

    }

  }

I use EventSource here to handle the server side events. This does not work for Microsoft IE or Edge as I understand.

Weather Component

@Component({
  selector: 'app-weather',
  templateUrl: './weather.component.html',
  styleUrls: ['./weather.component.css']
})
export class WeatherComponent implements OnInit, OnDestroy {

  weatherInfo$: Observable<IWeatherInfo[]>;
  all: IWeatherInfo[];

  constructor(private service: WeatherInfoService) { }
  
  ngOnInit() {
    this.service.startWeatherInfoEventSource();
    this.weatherInfo$ = this.service.weatherData;
  }

  ngOnDestroy() {
    this.service.onClose();
  }

  @HostListener('window:beforeunload', [ '$event' ])
  unloadHandler(event) {
    console.log('unloadHandler');
    this.service.onClose();
  }

}

The component kicks start the event source in the service layer when it is initialized to start receiving messages from the server. Note also it is important to close down the event source when done as in the ngOnDestroy() method.

Weather Component Template

Finally the html template. Note the user of async pipe.

// weather.component.html
<mat-nav-list>
  <mat-list-item *ngFor="let info of weatherInfo$ | async">
      Station Id: {{info.stationId}}, temperature: {{info.temperature}}C
  </mat-list-item>
</mat-nav-list>

With all the servers started up, you should see the streaming of weather events in the browser like the screenshot below

weatherInfoBrowser

It may also be useful to open up the dev console and see the event stream from the endpoint (/weather)

weatherInfoDevtool

Useful Resources

  1. Spring Kafka – JSON Serializer Deserializer Example (click here)
  2. Reactor 3 Reference Guild (click here)
  3. Spring Reference Documentation – Web Reactive Stack