RxJava is a popular library for composing asynchronous and event
based programs using observable sequences for the Java VM. RxGroovy
is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure functions to be used
and RxJava will know how to invoke them.
Vert.x integrates naturally with RxGroovy, allowing to use observable wherever you can use streams or asynchronous results.
To use vert.x API for RxGroovy, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
Gradle (in your build.gradle file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
RxJava observable is a perfect match for Vert.x ReadStream class : both provide a flow of items.
Vert.x API for Groovy provides io.vertx.groovy.core.stream.ReadStream objects, the RxGroovy provides a
Groovy extension module that adds the toObservable method to the read stream class.
Unresolved directive in index.adoc - include::readStream.groovy[tags=example]
The RxJava io.vertx.ext.rx.java.RxHelper should be used to:
- create an io.vertx.ext.rx.java.ObservableHandler,
- transform actions to an handler
The RxGroovy extension module adds the toHandler method on the rx.Observer class:
Unresolved directive in index.adoc - include::toHandler.groovy[tags=example]
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The RxGroovy extension module adds the toFuture method on the rx.Observer class:
Unresolved directive in index.adoc - include::toFuture.groovy[tags=example]
The reactive extension sometimes needs to schedule actions, for instance Observable#timer create and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra Rx.Scheduler,
the RxGroovy extension module adds to the Vertx class the scheduler() method will return a scheduler that can be used in such places.
Unresolved directive in index.adoc - include::scheduler.groovy[tags=example]
For blocking scheduled actions, a scheduler can be created with the blockingScheduler method:
Unresolved directive in index.adoc - include::blockingScheduler.groovy[tags=example]
RxJava can also be configured to use a scheduler by default, the returned scheduler hook uses a blocking scheduler for IO actions:
Unresolved directive in index.adoc - include::defaultScheduler.groovy[tags=example]
The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class)} creates an rx.Observable.Operator that
transforms an Observable<Buffer> in json format into an object observable:
Unresolved directive in index.adoc - include::unmarshaller.groovy[tags=example]
The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class, ObjectMapper)} creates an rx.Observable.Operator that
transforms an Observable<Buffer> in a format the mapper can parse into an object observable.
Include the appropriate dependency from here: jackson-dataformats-text
In order to use it to parse YAML you can do the following:
Unresolved directive in index.adoc - include::unmarhsallYaml.groovy[tags=example]
Let’s study now a few examples of using Vert.x with RxJava.
The event bus MessageConsumer provides naturally an Observable<Message<T>>:
Unresolved directive in index.adoc - include::eventBusMessages.groovy[tags=example]
The MessageConsumer provides a stream of Message.
The Message#body() gives access to a new stream of message bodies if needed:
Unresolved directive in index.adoc - include::eventBusBodies.groovy[tags=example]
RxJava map/reduce composition style can be then be used:
Unresolved directive in index.adoc - include::eventBusMapReduce.groovy[tags=example]
Timer task can be created with Vertx#timerStream(long):
Unresolved directive in index.adoc - include::timer.groovy[tags=example]
Periodic task can be created with Vertx#periodicStream(long):
Unresolved directive in index.adoc - include::periodic.groovy[tags=example]
The observable can be cancelled with an unsubscription:
Unresolved directive in index.adoc - include::periodicUnsubscribe.groovy[tags=example]
HttpClientRequest#toObservable() provides a one shot callback with the HttpClientResponse}
object. The observable reports a request failure.
Unresolved directive in index.adoc - include::httpClientRequest.groovy[tags=example]
The response can be processed as an `Observable<Buffer>` with the `HttpClientResponse#toObservable()` method:
Unresolved directive in index.adoc - include::httpClientResponse.groovy[tags=example]
The same flow can be achieved with the flatMap operation:
Unresolved directive in index.adoc - include::httpClientResponseFlatMap.groovy[tags=example]
We can also unmarshall the Observable<Buffer> into an object using the {@link io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)}
static method. This method creates an Rx.Observable.Operator unmarshalling buffers to an object:
Unresolved directive in index.adoc - include::httpClientResponseFlatMapUnmarshall.groovy[tags=example]
The HttpServer#requestStream() provides a callback for each incoming request:
Unresolved directive in index.adoc - include::httpServerRequest.groovy[tags=example]
The HttpServerRequest can then be adapted to an Observable<Buffer>:
Unresolved directive in index.adoc - include::httpServerRequestObservable.groovy[tags=example]
The io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)} can be used to parse and map a json request to an object:
Unresolved directive in index.adoc - include::httpServerRequestObservableUnmarshall.groovy[tags=example]
The`HttpClient#websocketStream`} provides a single callback when the websocket connects, otherwise a failure:
Unresolved directive in index.adoc - include::websocketClient.groovy[tags=example]
The WebSocket can then be turned into an Observable<Buffer> easily
Unresolved directive in index.adoc - include::websocketClientBuffer.groovy[tags=example]
The HttpServer#websocketStream() provides a callback for each incoming connection:
Unresolved directive in index.adoc - include::websocketServer.groovy[tags=example]
The ServerWebSocket can be turned into an Observable<Buffer> easily:
Unresolved directive in index.adoc - include::websocketServerBuffer.groovy[tags=example]