Fullstack Reactive Server Sent Events
Reactive programming is a programming paradigm aimed at maintaining overall coherence by propagating changes from a reactive source (modification of a variable, user input, etc.) to elements dependent on this source.
Reactive applications are more and more important today as they are:
- Highly Available: the system must respond quickly no matter what,
- Resilient: the system must always remain available, even in case of error or even if it is overloaded,
- Message Oriented: the system uses asynchronous messages.
Which meets the requirements of today's users.
Fullstack Reactive with Spring Webflux and Angular¶
Technically speaking, meeting such requirements implies using a set of frameworks to ensure a smooth user experience. This stack of reactive-compliant web frameworks goes from the backend with Spring WebFlux to the frontend with Angular.
Built on the Reactive Streams API (a standard for asynchronous stream processing with non-blocking back pressure), Spring WebFlux allows to run non-blocking servers such as Netty. It uses the reactive library Reactor which provides the Mono and Flux API types to work on single data (Mono) and sequences (Flux).
Angular extensively uses the RxJs library for user events propagation through UI components and services, but also for connections with the backend: The HTTP service returns Observables.
This allows us to use the same mix of asynchronous and functional programming in both our Java server code and in our TypeScript UI code, the APIs for Reactor and RxJS being quite similar. More importantly, with a fully reactive stack, changes can be propagated seamlessly everywhere in our application. When a user interacts with the UI, an HTTP request is sent to the server and the change is made on the backend. But how to propagate independent changes occurring on the server to the UI? By using Server Sent Events.
Server Sent Event Introduction¶
Server-Sent-Events (aka SSE), is an HTTP standard that allows a web application to receive events emitted by the server in a uni-directional stream.
Before using SSE, you should be aware of a few limitations:
- First and foremost, Microsoft IE and Edge browser are not compatible: you will need to use polyfills to make them work with SSE,
- The EventSource API has no support for custom headers,
- Web browser have a maximum number of connections per hostname (usually 6, for the whole browser), and since SSE keeps an open connection to the server, you should not open too many SSE streams at the same time,
- Data transmitted in events is encoded in UTF-8, unlike WebSocket which support binary data.
Code Example¶
SSE and Spring Webflux¶
Please check this Getting started with WebFlux guide if you're new to it.
Let's start by seeing how to handle Server-Sent-Events in Spring WebFlux to dispatch StorageWatcherEvents.
A StorageWatcherEvent
is a simple bean that contains a StorageNode
(the representation of a File in a tree) and an event: created, modified or deleted.
Directory watching is made using the directory-watcher project and exposed by a Spring service called StorageWatcherService.
text/event-stream¶
This first version of an SSE compliant Spring REST Controller is pretty simple.
We just have to return a Flux<StorageWatcherEvent>
:
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
[...]
@RestController()
@RequestMapping("/files")
class StorageController {
StorageWatcherService watcher;
@GetMapping(value = "/watch")
public Flux<StorageWatcherEvent> watch() {
return this.watcher.watch();
}
}
Spring will automatically handle all requests with the Accept
header to text/event-stream
as an SSE request an answer accordingly:
Note: You can force it with the produces annotation configuration :
@GetMapping(path = "/watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
.
There is issue with this solution through: if no event is generated by the watcher.watch()
for a while, the connection may be closed if it goes through an HTTP proxy.
The result is that you must set custom timeouts on your proxy: - timeout tunnel and timeout server for HAProxy, - proxy_read_timeout for NGINX.
Also, SSE Polyfill client libraries request keep-alive events to make SSE work on Microsoft IE and Edge.
Flux of ServerSentEvents¶
Let's see how to improve our server by making it send Keep-alive events.
Keep-alive are ServerSentEvent that do not contain data
but only a comment
.
Start by creating a utility service called SSEService that transforms a Flux<T>
into a Flux<ServerSentEvent<T>>
;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
[...]
@Component
final class SSEKeepAliveService implements SSEService {
long delay;
@Autowired
SSEKeepAliveService(@Value("${kraken.sse.keep-alive:#{environment.KRAKEN_SSE_KEEP_ALIVE_DELAY ?: 15}}") final Long delay) {
this.delay = delay;
}
@Override
public <T> Flux<ServerSentEvent<T>> wrap(Flux<T> flux) {
return Flux.merge(flux.map(t -> ServerSentEvent.builder(t).build()), Flux.interval(Duration.ofSeconds(this.delay)).map(aLong -> ServerSentEvent.<T>builder().comment("keep alive").build()));
}
It allows us to create ServerSentEvent with only a comment ServerSentEvent.<T>builder().comment("keep alive").build()
and inject them in the Flux periodically.
Then we can simply use this service in our REST controller and return directly the Flux<ServerSentEvent<StorageWatcherEvent>>
;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
[...]
@RestController()
@RequestMapping("/files")
class StorageController {
StorageWatcherService watcher;
SSEService sse;
@GetMapping(value = "/watch")
public Flux<ServerSentEvent<StorageWatcherEvent>> watch() {
return this.sse.wrap(this.watcher.watch());
}
}
The /files/watch
endpoint now returns a :keep-alive
comment every 15 second, along with the normal events that contain the JSON of a StorageWatcherEvent object:
To call this endpoint from Angular you need an EventSource, let's have a look at it!
SSE and Angular¶
On the client side, we use the EventSource to receive the stream events. This object has 3 EventHandler methods:
EventSource.onerror
called when an error occurs,EventSource.onmessage
called when a message event is received.EventSource.onopen
called when an open event is received.
EventSource Observable Wrapper¶
To make it reactive and fit in an Angular application, it's better to wrap it in an RxJS Observable.
Here is the event-source.service.ts
file:
import {Injectable} from '@angular/core';
import {Observable} from 'rxjs';
import * as _ from 'lodash';
@Injectable({
providedIn: 'root'
})
export class EventSourceService {
newEventSource(path: string): EventSource {
return new EventSource(path);
}
newObservable<R>(path: string, converter: (data: string) => R = _.identity): Observable<R> {
return new Observable(observer => {
const eventSource = this.newEventSource(path);
eventSource.onmessage = event => {
observer.next(converter(event.data));
};
eventSource.onerror = () => {
if (eventSource.readyState !== eventSource.CONNECTING) {
observer.error('An error occurred.');
}
eventSource.close();
observer.complete();
};
return () => {
eventSource.close();
};
});
}
}
This service method newObservable()
can then be subscribed like any other HTTP method:
this.eventSourceService.newObservable('/files/watch').subscribe((watcherEvent: StorageWatcherEvent) => console.log(watcherEvent));
Using PolyFills¶
To make SSE work on both Microsoft IE and Edge browser we need to install a Polyfills client library (and send Keep-alive comments from the server): A good library for this is Yaffle/EventSource.
The installation is straightforward:
npm install event-source-polyfill --save
Then update the polyfills.ts
file at the root of your application to add a reference to this library:
import 'event-source-polyfill/src/eventsource.min.js';
Finally, use it in the event-source.service.ts
instead of the native EventSource object:
import {NativeEventSource, EventSourcePolyfill} from 'event-source-polyfill';
const EventSource = NativeEventSource || EventSourcePolyfill;
@Injectable({
providedIn: 'root'
})
export class EventSourceService {
newEventSource(path: string): EventSource {
return new EventSource(path);
}
[...]
}