Skip to content
Fullstack Reactive Server Sent Events

Fullstack Reactive Server Sent Events

Reactive programming is a programming paradigm aimed at maintaining overall coherence by propagating changes from a reactive source (modification of a variable, user input, etc.) to elements dependent on this source.

Reactive applications are more and more important today as they are:

  • Highly Available: the system must respond quickly no matter what,
  • Resilient: the system must always remain available, even in case of error or even if it is overloaded,
  • Message Oriented: the system uses asynchronous messages.

Which meets the requirements of today's users.

Elevate your Load Testing!
Request a Demo

Fullstack Reactive with Spring Webflux and Angular

Technically speaking, meeting such requirements implies using a set of frameworks to ensure a smooth user experience. This stack of reactive-compliant web frameworks goes from the backend with Spring WebFlux to the frontend with Angular.

Built on the Reactive Streams API (a standard for asynchronous stream processing with non-blocking back pressure), Spring WebFlux allows to run non-blocking servers such as Netty. It uses the reactive library Reactor which provides the Mono and Flux API types to work on single data (Mono) and sequences (Flux).

Angular extensively uses the RxJs library for user events propagation through UI components and services, but also for connections with the backend: The HTTP service returns Observables.

SSE Propagation

This allows us to use the same mix of asynchronous and functional programming in both our Java server code and in our TypeScript UI code, the APIs for Reactor and RxJS being quite similar. More importantly, with a fully reactive stack, changes can be propagated seamlessly everywhere in our application. When a user interacts with the UI, an HTTP request is sent to the server and the change is made on the backend. But how to propagate independent changes occurring on the server to the UI? By using Server Sent Events.

Server Sent Event Introduction

Server-Sent-Events (aka SSE), is an HTTP standard that allows a web application to receive events emitted by the server in a uni-directional stream.

Before using SSE, you should be aware of a few limitations:

Code Example

SSE and Spring Webflux

Please check this Getting started with WebFlux guide if you're new to it.

Let's start by seeing how to handle Server-Sent-Events in Spring WebFlux to dispatch StorageWatcherEvents. A StorageWatcherEvent is a simple bean that contains a StorageNode (the representation of a File in a tree) and an event: created, modified or deleted. Directory watching is made using the directory-watcher project and exposed by a Spring service called StorageWatcherService.

text/event-stream

This first version of an SSE compliant Spring REST Controller is pretty simple. We just have to return a Flux<StorageWatcherEvent>:

import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

[...]

@RestController()
@RequestMapping("/files")
class StorageController {

  StorageWatcherService watcher;

  @GetMapping(value = "/watch")
  public Flux<StorageWatcherEvent> watch() {
    return this.watcher.watch();
  }
}

Spring will automatically handle all requests with the Accept header to text/event-stream as an SSE request an answer accordingly:

text/event-stream

Note: You can force it with the produces annotation configuration : @GetMapping(path = "/watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE).

There is issue with this solution through: if no event is generated by the watcher.watch() for a while, the connection may be closed if it goes through an HTTP proxy.

The result is that you must set custom timeouts on your proxy: - timeout tunnel and timeout server for HAProxy, - proxy_read_timeout for NGINX.

Also, SSE Polyfill client libraries request keep-alive events to make SSE work on Microsoft IE and Edge.

Flux of ServerSentEvents

Let's see how to improve our server by making it send Keep-alive events. Keep-alive are ServerSentEvent that do not contain data but only a comment.

Start by creating a utility service called SSEService that transforms a Flux<T> into a Flux<ServerSentEvent<T>>;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

[...]

@Component
final class SSEKeepAliveService implements SSEService {

  long delay;

  @Autowired
  SSEKeepAliveService(@Value("${kraken.sse.keep-alive:#{environment.KRAKEN_SSE_KEEP_ALIVE_DELAY ?: 15}}") final Long delay) {
    this.delay = delay;
  }

  @Override
  public <T> Flux<ServerSentEvent<T>> wrap(Flux<T> flux) {
    return Flux.merge(flux.map(t -> ServerSentEvent.builder(t).build()), Flux.interval(Duration.ofSeconds(this.delay)).map(aLong -> ServerSentEvent.<T>builder().comment("keep alive").build()));
  }

It allows us to create ServerSentEvent with only a comment ServerSentEvent.<T>builder().comment("keep alive").build() and inject them in the Flux periodically.

Then we can simply use this service in our REST controller and return directly the Flux<ServerSentEvent<StorageWatcherEvent>>;

import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

[...]

@RestController()
@RequestMapping("/files")
class StorageController {

  StorageWatcherService watcher;
  SSEService sse;

  @GetMapping(value = "/watch")
  public Flux<ServerSentEvent<StorageWatcherEvent>> watch() {
    return this.sse.wrap(this.watcher.watch());
  }

}

The /files/watch endpoint now returns a :keep-alive comment every 15 second, along with the normal events that contain the JSON of a StorageWatcherEvent object:

Keep Alive

To call this endpoint from Angular you need an EventSource, let's have a look at it!

SSE and Angular

On the client side, we use the EventSource to receive the stream events. This object has 3 EventHandler methods:

  • EventSource.onerror called when an error occurs,
  • EventSource.onmessage called when a message event is received.
  • EventSource.onopen called when an open event is received.

EventSource Observable Wrapper

To make it reactive and fit in an Angular application, it's better to wrap it in an RxJS Observable.

Here is the event-source.service.ts file:

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs';
import * as _ from 'lodash';

@Injectable({
  providedIn: 'root'
})
export class EventSourceService {

  newEventSource(path: string): EventSource {
    return new EventSource(path);
  }

  newObservable<R>(path: string, converter: (data: string) => R = _.identity): Observable<R> {
    return new Observable(observer => {
      const eventSource = this.newEventSource(path);
      eventSource.onmessage = event => {
        observer.next(converter(event.data));
      };
      eventSource.onerror = () => {
        if (eventSource.readyState !== eventSource.CONNECTING) {
          observer.error('An error occurred.');
        }
        eventSource.close();
        observer.complete();
      };
      return () => {
        eventSource.close();
      };
    });
  }
}

This service method newObservable() can then be subscribed like any other HTTP method:

this.eventSourceService.newObservable('/files/watch').subscribe((watcherEvent: StorageWatcherEvent) => console.log(watcherEvent));

Using PolyFills

To make SSE work on both Microsoft IE and Edge browser we need to install a Polyfills client library (and send Keep-alive comments from the server): A good library for this is Yaffle/EventSource.

The installation is straightforward:

npm install event-source-polyfill --save

Then update the polyfills.ts file at the root of your application to add a reference to this library:

import 'event-source-polyfill/src/eventsource.min.js';

Finally, use it in the event-source.service.ts instead of the native EventSource object:

import {NativeEventSource, EventSourcePolyfill} from 'event-source-polyfill';

const EventSource = NativeEventSource || EventSourcePolyfill;

@Injectable({
  providedIn: 'root'
})
export class EventSourceService {

  newEventSource(path: string): EventSource {
    return new EventSource(path);
  }

[...]

}
Want to become a super load tester?
Request a Demo