RxJS is a popular library for composing asynchronous and event based programs using observable sequences for the JavaScript.
Vert.x integrates naturally with RxJS, allowing to use observable wherever you can use streams or asynchronous results.
Vert.x for RxJS comes as an extension for RxJS:
var Rx = require("rx.vertx");
It provides the Rx object we need for creating Observable, or other kind of Rx objects.
If you are using Maven or Gradle, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
Gradle (in your build.gradle file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
RxJS observable is a perfect match for Vert.x read streams : both provide a flow of items.
A read stream can be adapted to an observable with the Rx.Observable.fromReadStream function:
Unresolved directive in index.adoc - include::read_stream.js[tags=example]
The rx.vertx module provides an observableHandler function:
Unresolved directive in index.adoc - include::handler.js[tags=example]
Rx can also turn an existing Observer into an handler:
Unresolved directive in index.adoc - include::observer_to_handler.js[tags=example]
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The rx.vertx module provides an observableFuture function:
Unresolved directive in index.adoc - include::future.js[tags=example]
Rx can also turn an existing Observer into an future:
Unresolved directive in index.adoc - include::observer_to_future.js[tags=example]
RxJS relies on the default context method timeout and interval functions to schedule operations. The vertx-js integration implements such functions providing an out of the box scheduler support.
Let’s study now a few examples of using Vert.x with RxJava.
The event bus message consumer provides naturally an stream of messages:
Unresolved directive in index.adoc - include::event_bus_messages.js[tags=example]
The message consumer provides a stream of messages. The Message#body() method gives access to a new
stream of message bodies if needed:
Unresolved directive in index.adoc - include::event_bus_bodies.js[tags=example]
RxJS map/reduce composition style can be then be used:
Unresolved directive in index.adoc - include::event_bus_map_reduce.js[tags=example]
Timer task can be created with Vertx#timerStream(long):
Unresolved directive in index.adoc - include::timer.js[tags=example]
Periodic task can be created with Vertx#periodicStream(long):
The HttpClientRequest provides a one shot callback with the
http.HttpClientResponse object. The observable reports a request failure.
Unresolved directive in index.adoc - include::http_client_request.js[tags=example]
The response can be processed as an stream of buffer:
Unresolved directive in index.adoc - include::http_client_response.js[tags=example]
The HttpServer#requestStream() provides a callback for each incoming
request:
Unresolved directive in index.adoc - include::http_server_request.js[tags=example]
The HttpServerRequest can then be adapted to a buffer observable:
Unresolved directive in index.adoc - include::http_server_request_observable.js[tags=example]
The HttpClient#websocketStream provides a single callback when the websocket connects, otherwise a failure:
Unresolved directive in index.adoc - include::websocket_client.js[tags=example]
The WebSocket can then be turned into an observable of buffer easily
Unresolved directive in index.adoc - include::websocket_client_buffer.js[tags=example]
The HttpServer#websocketStream() provides a callback for each incoming connection:
Unresolved directive in index.adoc - include::websocket_server.js[tags=example]
The ServerWebSocket can be turned into a buffer observable easily:
Unresolved directive in index.adoc - include::websocket_server_buffer.js[tags=example]