RxJava is a popular library for composing asynchronous and event
based programs using observable sequences for the Java VM. RxGroovy
is the Reactive Extensions for Groovy. This adaptor allows groovy.lang.Closure
functions to be used
and RxJava will know how to invoke them.
Vert.x integrates naturally with RxGroovy, allowing to use observable wherever you can use streams or asynchronous results.
To use vert.x API for RxGroovy, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>{maven-groupId}</groupId>
<artifactId>{maven-artifactId}</artifactId>
<version>{maven-version}</version>
</dependency>
Gradle (in your build.gradle
file):
compile {maven-groupId}:{maven-artifactId}:{maven-version}
RxJava observable is a perfect match for Vert.x ReadStream
class : both provide a flow of items.
Vert.x API for Groovy provides io.vertx.groovy.core.stream.ReadStream
objects, the RxGroovy provides a
Groovy extension module that adds the toObservable
method to the read stream class.
Unresolved directive in index.adoc - include::readStream.groovy[tags=example]
The RxJava io.vertx.ext.rx.java.RxHelper
should be used to:
- create an io.vertx.ext.rx.java.ObservableHandler
,
- transform actions to an handler
The RxGroovy extension module adds the toHandler
method on the rx.Observer
class:
Unresolved directive in index.adoc - include::toHandler.groovy[tags=example]
In Vert.x future objects are modelled as async result handlers and occur as last parameter of asynchronous methods.
The RxGroovy extension module adds the toFuture
method on the rx.Observer
class:
Unresolved directive in index.adoc - include::toFuture.groovy[tags=example]
The reactive extension sometimes needs to schedule actions, for instance Observable#timer
create and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer thread are not Vert.x threads and therefore not executing in a Vert.x event loop.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra Rx.Scheduler
,
the RxGroovy extension module adds to the Vertx
class the scheduler()
method will return a scheduler that can be used in such places.
Unresolved directive in index.adoc - include::scheduler.groovy[tags=example]
For blocking scheduled actions, a scheduler can be created with the blockingScheduler
method:
Unresolved directive in index.adoc - include::blockingScheduler.groovy[tags=example]
RxJava can also be configured to use a scheduler by default, the returned scheduler hook uses a blocking scheduler for IO actions:
Unresolved directive in index.adoc - include::defaultScheduler.groovy[tags=example]
The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class)}
creates an rx.Observable.Operator
that
transforms an Observable<Buffer>
in json format into an object observable:
Unresolved directive in index.adoc - include::unmarshaller.groovy[tags=example]
The io.vertx.rx.groovy.RxHelper#unmarshaller(java.lang.Class, ObjectMapper)}
creates an rx.Observable.Operator
that
transforms an Observable<Buffer>
in a format the mapper can parse into an object observable.
Include the appropriate dependency from here: jackson-dataformats-text
In order to use it to parse YAML you can do the following:
Unresolved directive in index.adoc - include::unmarhsallYaml.groovy[tags=example]
Let’s study now a few examples of using Vert.x with RxJava.
The event bus MessageConsumer
provides naturally an Observable<Message<T>>
:
Unresolved directive in index.adoc - include::eventBusMessages.groovy[tags=example]
The MessageConsumer
provides a stream of Message
.
The Message#body()
gives access to a new stream of message bodies if needed:
Unresolved directive in index.adoc - include::eventBusBodies.groovy[tags=example]
RxJava map/reduce composition style can be then be used:
Unresolved directive in index.adoc - include::eventBusMapReduce.groovy[tags=example]
Timer task can be created with Vertx#timerStream(long)
:
Unresolved directive in index.adoc - include::timer.groovy[tags=example]
Periodic task can be created with Vertx#periodicStream(long)
:
Unresolved directive in index.adoc - include::periodic.groovy[tags=example]
The observable can be cancelled with an unsubscription:
Unresolved directive in index.adoc - include::periodicUnsubscribe.groovy[tags=example]
HttpClientRequest#toObservable()
provides a one shot callback with the HttpClientResponse
}
object. The observable reports a request failure.
Unresolved directive in index.adoc - include::httpClientRequest.groovy[tags=example]
The response can be processed as an `Observable<Buffer>` with the `HttpClientResponse#toObservable()` method:
Unresolved directive in index.adoc - include::httpClientResponse.groovy[tags=example]
The same flow can be achieved with the flatMap
operation:
Unresolved directive in index.adoc - include::httpClientResponseFlatMap.groovy[tags=example]
We can also unmarshall the Observable<Buffer>
into an object using the {@link io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)}
static method. This method creates an Rx.Observable.Operator
unmarshalling buffers to an object:
Unresolved directive in index.adoc - include::httpClientResponseFlatMapUnmarshall.groovy[tags=example]
The HttpServer#requestStream()
provides a callback for each incoming request:
Unresolved directive in index.adoc - include::httpServerRequest.groovy[tags=example]
The HttpServerRequest
can then be adapted to an Observable<Buffer>
:
Unresolved directive in index.adoc - include::httpServerRequestObservable.groovy[tags=example]
The io.vertx.rx.groovy.RxHelpe.RxHelper#unmarshaller(java.lang.Class)}
can be used to parse and map a json request to an object:
Unresolved directive in index.adoc - include::httpServerRequestObservableUnmarshall.groovy[tags=example]
The`HttpClient#websocketStream`} provides a single callback when the websocket connects, otherwise a failure:
Unresolved directive in index.adoc - include::websocketClient.groovy[tags=example]
The WebSocket
can then be turned into an Observable<Buffer>
easily
Unresolved directive in index.adoc - include::websocketClientBuffer.groovy[tags=example]
The HttpServer#websocketStream()
provides a callback for each incoming connection:
Unresolved directive in index.adoc - include::websocketServer.groovy[tags=example]
The ServerWebSocket
can be turned into an Observable<Buffer>
easily:
Unresolved directive in index.adoc - include::websocketServerBuffer.groovy[tags=example]