RxJava is a popular library for composing asynchronous and event based programs using observable sequences for the Java VM.
Vert.x integrates naturally with RxJava, allowing to use observable wherever you can use streams or asynchronous results.
To use Vert.x API for RxJava2, add the following dependency to the dependencies section of your build descriptor:
Maven (in your pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.6.2</version>
</dependency>
Gradle (in your build.gradle
file):
compile 'io.vertx:vertx-rx-java2:3.6.2'
There are two ways for using the RxJava 2 API with Vert.x:
via the original Vert.x API with helpers class that provides static methods for converting objects between Vert.x core API and RxJava 2 API
via the Rxified Vert.x API enhancing the core Vert.x API.
RxJava Flowable
is a perfect match for Vert.x ReadStream
class : both provide a flow of items.
The FlowableHelper.toFlowable
static methods convert
a Vert.x read stream to a Flowable
:
FileSystem fileSystem = vertx.fileSystem();
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
The Rxified Vert.x API provides a toFlowable
method on
ReadStream
:
FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = file.toFlowable();
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
Such flowables are hot flowables, i.e. they will produce notifications regardless of subscriptions because
a ReadStream
can potentially emit items spontaneously or not, depending on the implementation:
At subscription time, the adapter calls handler
to set its own handler.
Some ReadStream
implementations can start to emit events after this call, others will emit events wether an
handler is set:
AsyncFile
produces buffer events after the handler is set
HttpServerRequest
produces events independantly of the handler (i.e buffer may be lost if no handler is set)
In both cases, subscribing to the Flowable
in the same call is safe because the event loop or the worker
verticles cannot be called concurrently, so the subscription will always happens before the handler starts emitting
data.
When you need to delay the subscription, you need to pause
the ReadStream
and then resume
it, which is what
you would do with a ReadStream
.
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
// Stop receiving buffers
request.pause();
checkAuth(res -> {
// Now we can receive buffers again
request.resume();
if (res.succeeded()) {
Flowable<Buffer> flowable = request.toFlowable();
flowable.subscribe(buff -> {
// Get buffers
});
}
});
}
});
Likewise it is possible to turn an existing Flowable
into a Vert.x ReadStream
.
The FlowableHelper.toReadStream
static methods convert
a Flowable
to a Vert.x read stream:
Flowable<Buffer> observable = getFlowable();
ReadStream<Buffer> readStream = FlowableHelper.toReadStream(observable);
Pump pump = Pump.pump(readStream, response);
pump.start();
A WriteStream
, like a org.reactivestreams.Subscriber
, consumes items, and, when it can’t keep-up, collaborates with the producer to avoid an ever-growing backlog.
Vert.x provides the WriteStreamSubscriber
adapter that you can use to send Flowable
items to any WriteStream
:
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.reactivex.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
Tip
|
There is also a io.vertx.reactivex.WriteStreamObserver adapter for the non-backpressured io.reactivex.Observable .
The difference is that this adapter will send items to the WriteStream even when it can’t keep-up with the producer rate.
|
If you are progamming with the Rxified Vert.x API, the WriteStream
implementations provide a toSubscriber
method.
The previous example then becomes even more straightforward:
response.setChunked(true);
flowable.subscribe(response.toSubscriber());
Note
|
When the Flowable terminates successfully, the adapter invokes the end method.
|
Caution
|
The adapter sets the WriteStream drain and exception handlers, so don’t use them after subscribing.
|
The WriteStreamSubscriber
adapter is able to invoke callbacks when:
the Flowable
completes, or
the Flowable
terminates with an error, or
the WriteStream
fails (e.g. HTTP connection is closed or filesystem is full)
This allows for a more robust program design, as well as scheduling other tasks after the stream has been handled:
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
// log error
}
});
subscriber.onWriteStreamError(throwable -> {
// log error
});
subscriber.onComplete(() -> {
// log end of transaction to audit system...
});
flowable.subscribe(subscriber);
Note
|
If the WriteStream fails, the adapter cancels the org.reactivestreams.Subscription .
|
You can create an RxJava Observer
from an existing Vert.x Handler<AsyncResult<T>>
and subscribe
it:
Handler<AsyncResult<String>> handler = getHandler();
// Subscribe to a Single
Single.just("hello").subscribe(SingleHelper.toObserver(handler));
Handler<AsyncResult<String>> handler = getHandler();
// Subscribe to a Single
Maybe.just("hello").subscribe(MaybeHelper.toObserver(handler));
Handler<AsyncResult<Void>> handler = getHandler();
// Subscribe to a Single
Completable.complete().subscribe(CompletableHelper.toObserver(handler));
The Rxified Vert.x API duplicates each such method with the rx
prefix that returns an RxJava Single
,
Maybe
or Completable
:
Single<HttpServer> single = vertx
.createHttpServer()
.rxListen(1234, "localhost");
// Subscribe to bind the server
single.
subscribe(
server -> {
// Server is listening
},
failure -> {
// Server could not start
}
);
Such single are cold singles, and the corresponding API method is called on subscribe.
Maybe
can produce a result or no result:
DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);
// Obtain a maybe that performs the actual reverse lookup on subscribe
Maybe<String> maybe = client.rxReverseLookup(ipAddress);
// Subscribe to perform the lookup
maybe.
subscribe(
name -> {
// Lookup produced a result
},
failure -> {
// Lookup failed
},
() -> {
// Lookup produced no result
}
);
Completable
is usually mapped to Handler<AsyncResult<Void>>
Completable single = server.rxClose();
// Subscribe to bind the server
single.
subscribe(
() -> {
// Server is closed
},
failure -> {
// Server closed but encoutered issue
}
);
Tip
|
If you cannot use the Vert.x Rxified API or, if you have your own, callback-based, asynchronous methods, Vert.x provides adapters: |
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
vertx.executeBlocking(fut -> fut.complete(invokeBlocking()), handler);
});
The reactive extension sometimes needs to schedule actions, for instance Flowable#timer
creates and returns
a timer that emit periodic events. By default, scheduled actions are managed by RxJava, it means that the
timer threads are not Vert.x threads and therefore not executing in a Vert.x event loop nor on a Vert.x worker thread.
When an RxJava method deals with a scheduler, it accepts an overloaded method accepting an extra io.reactivex.Scheduler
,
the RxHelper.scheduler
method will return a scheduler that can be used
in such places.
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
For blocking scheduled actions, a scheduler can be created with the RxHelper.blockingScheduler
method:
Scheduler scheduler = RxHelper.blockingScheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJava can also be reconfigured to use the Vert.x scheduler:
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
Caution
|
RxJava uses the words computation for non-blocking tasks and io for blocking tasks which is the opposite of the Vert.x terminology |
The Rxified Vert.x API provides also similar method on the RxHelper
class:
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
It is also possible to create a scheduler backed by a named worker pool. This can be useful if you want to re-use the specific thread pool for scheduling blocking actions:
Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
The FlowableHelper.unmarshaller
creates an io.reactivex.rxjava2.FlowableOperator
that
transforms an Flowable<Buffer>
in json format into an object flowable:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
mypojo -> {
// Process the object
}
);
});
The same can be done with the Rxified helper:
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.compose(ObservableHelper.unmarshaller((MyPojo.class))).subscribe(
mypojo -> {
// Process the object
}
);
});
To deploy existing Verticle instances, you can use RxHelper.deployVerticle
, it deploys a Verticle
and returns an Single<String>
of the deployment ID.
Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);
deployment.subscribe(id -> {
// Deployed
}, err -> {
// Could not deploy
});
io.vertx.rxjava
prefix, for instance the io.vertx.core.Vertx
class is
translated to the Vertx
class.
Just use the Vertx.vertx
methods:
Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();
Extend the AbstractVerticle
class, it will wrap it for you:
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public void start() {
// Use Rxified Vertx here
}
}
Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer.
Verticles having an asynchronous start can override instead the rxStart
method and return a Completable
:
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public Completable rxStart() {
return vertx.createHttpServer()
.requestHandler(req -> req.response().end("Hello World"))
.rxListen()
.toCompletable();
}
}
Let’s study now a few examples of using Vert.x with RxJava.
The event bus MessageConsumer
provides naturally an Observable<Message<T>>
:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Disposable sub = observable.subscribe(msg -> {
// Got message
});
// Unregisters the stream after 10 seconds
vertx.setTimer(10000, id -> {
sub.dispose();
});
The MessageConsumer
provides a stream of Message
.
The body
gives access to a new stream of message bodies if needed:
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();
RxJava map/reduce composition style can then be used:
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
Timer task can be created with timerStream
:
vertx.timerStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback after 1 second");
}
);
Periodic task can be created with periodicStream
:
vertx.periodicStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback every second");
}
);
The observable can be cancelled with an unsubscription:
vertx.periodicStream(1000).
toObservable().
subscribe(new Observer<Long>() {
private Disposable sub;
public void onSubscribe(@NonNull Disposable d) {
sub = d;
}
public void onNext(Long aLong) {
// Callback
sub.dispose();
}
public void onError(Throwable e) {}
public void onComplete() {}
});
We recommend to use the Vert.x Web Client with RxJava.
The requestStream
provides a callback for each incoming
request:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
// Process request
});
The HttpServerRequest
can then be adapted to an Observable<Buffer>
:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<Buffer> observable = request.toObservable();
});
The ObservableHelper.unmarshaller
can be used to parse and map
a json request to an object:
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<MyPojo> observable = request.
toObservable().
compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(MyPojo.class));
});
The websocketStream
provides a single callback when the websocket
connects, otherwise a failure:
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.websocketStream(8080, "localhost", "/the_uri").toObservable().subscribe(
ws -> {
// Use the websocket
},
error -> {
// Could not connect
}
);
The WebSocket
can then be turned into an Observable<Buffer>
easily:
socketObservable.subscribe(
socket -> {
Flowable<Buffer> dataObs = socket.toFlowable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
The websocketStream
provides a callback for each incoming
connection:
Observable<ServerWebSocket> socketObservable = server.websocketStream().toObservable();
socketObservable.subscribe(
socket -> System.out.println("Web socket connect"),
failure -> System.out.println("Should never be called"),
() -> {
System.out.println("Subscription ended or server closed");
}
);
The ServerWebSocket
can be turned into an Observable<Buffer>
easily:
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);