Skip to main content
编辑本页

RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM. RxGroovy is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure functions to be used and RxJava will know how to invoke them.

Vert.x integrates naturally with RxGroovy, allowing to use observable wherever you can use streams or asynchronous results.

To use vert.x API for RxGroovy, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>{maven-groupId}</groupId>
  <artifactId>{maven-artifactId}</artifactId>
  <version>{maven-version}</version>
</dependency>
  • Gradle (in your build.gradle file):

compile {maven-groupId}:{maven-artifactId}:{maven-version}

Read stream support

RxJava observable is a perfect match for Vert.x ReadStream class : both provide a flow of items.

Vert.x API for Groovy provides io.vertx.groovy.core.stream.ReadStream objects, the RxGroovy provides a Groovy extension module that adds the toObservable method to the read stream class.

Unresolved directive in index.adoc - include::readStream.groovy[tags=example]

Handler support

The RxJava io.vertx.ext.rx.java.RxHelper should be used to: - create an io.vertx.ext.rx.java.ObservableHandler, - transform actions to an handler

The RxGroovy extension module adds the toHandler method on the rx.Observer class:

Unresolved directive in index.adoc - include::toHandler.groovy[tags=example]

Async result support

In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.

The RxGroovy extension module adds the toFuture method on the rx.Observer class:

Unresolved directive in index.adoc - include::toFuture.groovy[tags=example]

Scheduler support

The reactive extension sometimes needs to schedule actions, for instance Observable#timer create and returns a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.

When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra Rx.Scheduler, the RxGroovy extension module adds to the Vertx class the scheduler() method will return a scheduler that can be used in such places.

Unresolved directive in index.adoc - include::scheduler.groovy[tags=example]

For blocking scheduled actions, a scheduler can be created with the blockingScheduler method:

Unresolved directive in index.adoc - include::blockingScheduler.groovy[tags=example]

RxJava can also be configured to use a scheduler by default, the returned scheduler hook uses a blocking scheduler for IO actions:

Unresolved directive in index.adoc - include::defaultScheduler.groovy[tags=example]

Json unmarshalling

The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class)} creates an rx.Observable.Operator that transforms an Observable<Buffer> in json format into an object observable:

Unresolved directive in index.adoc - include::unmarshaller.groovy[tags=example]

Marshall other dataformats

The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class, ObjectMapper)} creates an rx.Observable.Operator that transforms an Observable<Buffer> in a format the mapper can parse into an object observable.

Include the appropriate dependency from here: jackson-dataformats-text

In order to use it to parse YAML you can do the following:

Unresolved directive in index.adoc - include::unmarhsallYaml.groovy[tags=example]

Api examples

Let’s study now a few examples of using Vert.x with RxJava.

EventBus message stream

The event bus MessageConsumer provides naturally an Observable<Message<T>>:

Unresolved directive in index.adoc - include::eventBusMessages.groovy[tags=example]

The MessageConsumer provides a stream of Message. The Message#body() gives access to a new stream of message bodies if needed:

Unresolved directive in index.adoc - include::eventBusBodies.groovy[tags=example]

RxJava map/reduce composition style can be then be used:

Unresolved directive in index.adoc - include::eventBusMapReduce.groovy[tags=example]

Timers

Timer task can be created with Vertx#timerStream(long):

Unresolved directive in index.adoc - include::timer.groovy[tags=example]

Periodic task can be created with Vertx#periodicStream(long):

Unresolved directive in index.adoc - include::periodic.groovy[tags=example]

The observable can be cancelled with an unsubscription:

Unresolved directive in index.adoc - include::periodicUnsubscribe.groovy[tags=example]

Http client requests

HttpClientRequest#toObservable() provides a one shot callback with the HttpClientResponse} object. The observable reports a request failure.

Unresolved directive in index.adoc - include::httpClientRequest.groovy[tags=example]
 The response can be processed as an `Observable<Buffer>` with the
`HttpClientResponse#toObservable()` method:
Unresolved directive in index.adoc - include::httpClientResponse.groovy[tags=example]

The same flow can be achieved with the flatMap operation:

Unresolved directive in index.adoc - include::httpClientResponseFlatMap.groovy[tags=example]

We can also unmarshall the Observable<Buffer> into an object using the {@link io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)} static method. This method creates an Rx.Observable.Operator unmarshalling buffers to an object:

Unresolved directive in index.adoc - include::httpClientResponseFlatMapUnmarshall.groovy[tags=example]

Http server requests

The HttpServer#requestStream() provides a callback for each incoming request:

Unresolved directive in index.adoc - include::httpServerRequest.groovy[tags=example]

The HttpServerRequest can then be adapted to an Observable<Buffer>:

Unresolved directive in index.adoc - include::httpServerRequestObservable.groovy[tags=example]

The io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)} can be used to parse and map a json request to an object:

Unresolved directive in index.adoc - include::httpServerRequestObservableUnmarshall.groovy[tags=example]

Websocket client

The`HttpClient#websocketStream`} provides a single callback when the websocket connects, otherwise a failure:

Unresolved directive in index.adoc - include::websocketClient.groovy[tags=example]

The WebSocket can then be turned into an Observable<Buffer> easily

Unresolved directive in index.adoc - include::websocketClientBuffer.groovy[tags=example]

Websocket server

The HttpServer#websocketStream() provides a callback for each incoming connection:

Unresolved directive in index.adoc - include::websocketServer.groovy[tags=example]

The ServerWebSocket can be turned into an Observable<Buffer> easily:

Unresolved directive in index.adoc - include::websocketServerBuffer.groovy[tags=example]