RxJS is a popular library for composing asynchronous and event based programs using observable sequences for the JavaScript.
Vert.x integrates naturally with RxJS, allowing to use observable wherever you can use streams or asynchronous results.
Vert.x for RxJS comes as an extension for RxJS:
var Rx = require("rx.vertx");
It provides the Rx
object we need for creating Observable
, or other kind of Rx objects.
If you are using Maven or Gradle, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
Gradle (in your build.gradle
file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
RxJS observable is a perfect match for Vert.x read streams : both provide a flow of items.
A read stream can be adapted to an observable with the Rx.Observable.fromReadStream
function:
Unresolved directive in index.adoc - include::read_stream.js[tags=example]
The rx.vertx
module provides an observableHandler
function:
Unresolved directive in index.adoc - include::handler.js[tags=example]
Rx can also turn an existing Observer into an handler:
Unresolved directive in index.adoc - include::observer_to_handler.js[tags=example]
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The rx.vertx
module provides an observableFuture
function:
Unresolved directive in index.adoc - include::future.js[tags=example]
Rx can also turn an existing Observer into an future:
Unresolved directive in index.adoc - include::observer_to_future.js[tags=example]
RxJS relies on the default context method timeout and interval functions to schedule operations. The vertx-js integration implements such functions providing an out of the box scheduler support.
Let’s study now a few examples of using Vert.x with RxJava.
The event bus message consumer provides naturally an stream of messages:
Unresolved directive in index.adoc - include::event_bus_messages.js[tags=example]
The message consumer provides a stream of messages. The Message#body()
method gives access to a new
stream of message bodies if needed:
Unresolved directive in index.adoc - include::event_bus_bodies.js[tags=example]
RxJS map/reduce composition style can be then be used:
Unresolved directive in index.adoc - include::event_bus_map_reduce.js[tags=example]
Timer task can be created with Vertx#timerStream(long)
:
Unresolved directive in index.adoc - include::timer.js[tags=example]
Periodic task can be created with Vertx#periodicStream(long)
:
The HttpClientRequest provides a one shot callback with the
http.HttpClientResponse
object. The observable reports a request failure.
Unresolved directive in index.adoc - include::http_client_request.js[tags=example]
The response can be processed as an stream of buffer:
Unresolved directive in index.adoc - include::http_client_response.js[tags=example]
The HttpServer#requestStream()
provides a callback for each incoming
request:
Unresolved directive in index.adoc - include::http_server_request.js[tags=example]
The HttpServerRequest
can then be adapted to a buffer observable:
Unresolved directive in index.adoc - include::http_server_request_observable.js[tags=example]
The HttpClient#websocketStream
provides a single callback when the websocket connects, otherwise a failure:
Unresolved directive in index.adoc - include::websocket_client.js[tags=example]
The WebSocket
can then be turned into an observable of buffer easily
Unresolved directive in index.adoc - include::websocket_client_buffer.js[tags=example]
The HttpServer#websocketStream()
provides a callback for each incoming connection:
Unresolved directive in index.adoc - include::websocket_server.js[tags=example]
The ServerWebSocket
can be turned into a buffer observable easily:
Unresolved directive in index.adoc - include::websocket_server_buffer.js[tags=example]