Skip to main content
编辑本页

RxJS is a popular library for composing asynchronous and event based programs using observable sequences for the JavaScript.

Vert.x integrates naturally with RxJS, allowing to use observable wherever you can use streams or asynchronous results.

Vert.x for RxJS comes as an extension for RxJS:

var Rx = require("rx.vertx");

It provides the Rx object we need for creating Observable, or other kind of Rx objects.

If you are using Maven or Gradle, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>{maven-groupId}</groupId>
  <artifactId>{maven-artifactId}</artifactId>
  <version>{maven-version}</version>
</dependency>
  • Gradle (in your build.gradle file):

compile {maven-groupId}:{maven-artifactId}:{maven-version}

Read stream support

RxJS observable is a perfect match for Vert.x read streams : both provide a flow of items. A read stream can be adapted to an observable with the Rx.Observable.fromReadStream function:

Unresolved directive in index.adoc - include::read_stream.js[tags=example]

Handler support

The rx.vertx module provides an observableHandler function:

Unresolved directive in index.adoc - include::handler.js[tags=example]

Rx can also turn an existing Observer into an handler:

Unresolved directive in index.adoc - include::observer_to_handler.js[tags=example]

Future support

In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.

The rx.vertx module provides an observableFuture function:

Unresolved directive in index.adoc - include::future.js[tags=example]

Rx can also turn an existing Observer into an future:

Unresolved directive in index.adoc - include::observer_to_future.js[tags=example]

Scheduler support

RxJS relies on the default context method timeout and interval functions to schedule operations. The vertx-js integration implements such functions providing an out of the box scheduler support.

Examples

Let’s study now a few examples of using Vert.x with RxJava.

EventBus message stream

The event bus message consumer provides naturally an stream of messages:

Unresolved directive in index.adoc - include::event_bus_messages.js[tags=example]

The message consumer provides a stream of messages. The Message#body() method gives access to a new stream of message bodies if needed:

Unresolved directive in index.adoc - include::event_bus_bodies.js[tags=example]

RxJS map/reduce composition style can be then be used:

Unresolved directive in index.adoc - include::event_bus_map_reduce.js[tags=example]

Timers

Timer task can be created with Vertx#timerStream(long):

Unresolved directive in index.adoc - include::timer.js[tags=example]

Periodic task can be created with Vertx#periodicStream(long):

Http client requests

The HttpClientRequest provides a one shot callback with the http.HttpClientResponse object. The observable reports a request failure.

Unresolved directive in index.adoc - include::http_client_request.js[tags=example]

The response can be processed as an stream of buffer:

Unresolved directive in index.adoc - include::http_client_response.js[tags=example]

Http server requests

The HttpServer#requestStream() provides a callback for each incoming request:

Unresolved directive in index.adoc - include::http_server_request.js[tags=example]

The HttpServerRequest can then be adapted to a buffer observable:

Unresolved directive in index.adoc - include::http_server_request_observable.js[tags=example]

Websocket client

The HttpClient#websocketStream provides a single callback when the websocket connects, otherwise a failure:

Unresolved directive in index.adoc - include::websocket_client.js[tags=example]

The WebSocket can then be turned into an observable of buffer easily

Unresolved directive in index.adoc - include::websocket_client_buffer.js[tags=example]

Websocket server

The HttpServer#websocketStream() provides a callback for each incoming connection:

Unresolved directive in index.adoc - include::websocket_server.js[tags=example]

The ServerWebSocket can be turned into a buffer observable easily:

Unresolved directive in index.adoc - include::websocket_server_buffer.js[tags=example]