Vertx-sync is a set of utilities that allow you to perform asynchronous operations and receive events in a synchronous way, but without blocking kernel threads.
One of the key advantages of Vert.x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads) - this allows it to handle a lot of concurrency (e.g. handle many connections, or messages) using a very small number of kernel threads, which allows it to scale very well.
The non blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callback style, promises or Rx-style. Vert.x uses callback style in most places (although it also supports Rx).
In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do in sequence. Also error propagation is often more complex when using asynchronous APIs.
Vertx-sync allows you to work with asynchronous APIs, but using a direct synchronous style that you’re already familiar with.
It does this by using fibers
. Fibers are very lightweight threads that do not correspond to underlying kernel threads.
When they are blocked they do not block a kernel thread.
Vert-sync uses Quasar to implement the fibers.
Note
|
Vert-sync currently only works with Java. |
In order to use vertx-sync you must deploy your code as instances of io.vertx.ext.sync.SyncVerticle
.
You should override the start()
and (optionally) the stop()
methods of the verticle.
Those methodsmust* be annotated with the @Suspendable
annotation.
Once you’ve written your sync verticle(s) you deploy them in exactly the same way as any other verticle.
Vert.x uses Quasar which implements fibers by using bytecode instrumentation. This is done at run-time using a java agent.
In order for this to work you must start the JVM specifying the java agent jar which is located in the quasar-core jar.
TODO how to reference quasar core jar in fatjar?
-javaagent:/path/to/quasar/core/quasar-core.jar
If you are using the vertx
command line tools, the agent configuration can be enabled by setting the ENABLE_VERTX_SYNC_AGENT
environment variable to true
, before executing vertx
.
You can also use a offline instrumentation as with the quasar-maven-plugin or or quasar-gradle-plugin. Check the Quasar documentation for more details.
Many async operations in Vert.x-land take a Handler<AsyncResult<T>>
as the last argument. An example would
executing a find using the Vert.x Mongo client or sending an event bus message and getting a reply.
Vertx-sync allows you to get the result of a one-shot asynchronous operation in a synchronous way.
This is done by using the Sync.awaitResult
method.
The method is executed specifying the asynchronous operation that you want to execute in the form of a Consumer
,
the consumer is passed the handler at run-time.
Here’s an example:
EventBus eb = vertx.eventBus();
// Send a message and get the reply synchronously
Message<String> reply = awaitResult(h -> eb.send("someaddress", "ping", h));
System.out.println("Received reply " + reply.body());
In the above example the fiber is blocked until the reply is returned but no kernel thread is blocked.
Vertx-sync can be used to get one-shot events in a synchronous way, for example firings of timers, or the executing of
an end handler. This is achieved using the Sync.awaitEvent
method.
Here’s an example:
long tid = awaitEvent(h -> vertx.setTimer(1000, h));
System.out.println("Timer has now fired");
In many places in Vert.x streams of events are provided by passing them to handlers.
Examples include event bus message consumers and HTTP server requests on an HTTP server.
Vert-sync allows you to receive events from such streams in a synchronous way.
You do this with an instance of HandlerReceiverAdaptor
which implements both
Handler
and Receiver
. You create an instance using
Sync.streamAdaptor
.
You can set it as a normal handler and then use the methods on Receiver
to receive
events synchronously.
Here’s an example using an event bus message consumer:
EventBus eb = vertx.eventBus();
HandlerReceiverAdaptor<Message<String>> adaptor = streamAdaptor();
eb.<String>consumer("some-address").handler(adaptor);
// Receive 10 messages from the consumer:
for (int i = 0; i < 10; i++) {
Message<String> received1 = adaptor.receive();
System.out.println("got message: " + received1.body());
}
FiberHandler
If you want to do use fibers in a normal handler, e.g. in the request handler of an Http Server then you must first convert the normal handler to a fiber handler.
The fiber handler runs the normal handler on a fiber.
Here’s an example:
EventBus eb = vertx.eventBus();
vertx.createHttpServer().requestHandler(fiberHandler(req -> {
// Send a message to address and wait for a reply
Message<String> reply = awaitResult(h -> eb.send("some-address", "blah", h));
System.out.println("Got reply: " + reply.body());
// Now end the response
req.response().end("blah");
})).listen(8080, "localhost");
There are a set of working examples demonstrating vertx-sync in action in the examples repository
Quasar and co-routines do not "automagically" transform blocking code into non-blocking code.
Especially, blocking using Thread.sleep
or using synchronized
blocks and methods is a problem.
There are 2 types of exceptions that you may observe when using vertx-sync
.
You may encounter stack traces like the following in your logs:
(...) [quasar] ERROR: while transforming io/vertx/core/impl/DeploymentManager$DeploymentImpl: Unable to instrument vertx/core/impl/DeploymentManager$DeploymentImpl#lambda$rollback$1(Ljava/lang/Throwable;Lio/vertx/core/impl/ContextInternal;Lio/vertx/core/Handler;/vertx/core/impl/ContextImpl;Lio/vertx/core/AsyncResult;)V because of synchronization co.paralleluniverse.fibers.instrument.UnableToInstrumentException: Unable to instrument vertx/core/impl/DeploymentManager$DeploymentImpl#lambda$rollback$1(Ljava/lang/Throwable;Lio/vertx/core/impl/ContextInternal;Lio/vertx/core/Handler;/vertx/core/impl/ContextImpl;Lio/vertx/core/AsyncResult;)V because of synchronization at co.paralleluniverse.fibers.instrument.InstrumentMethod.dumpCodeBlock(InstrumentMethod.java:720) at co.paralleluniverse.fibers.instrument.InstrumentMethod.accept(InstrumentMethod.java:415) at co.paralleluniverse.fibers.instrument.InstrumentClass.visitEnd(InstrumentClass.java:265) (...)
These errors are actually warnings from Quasar as it tries to instrument both your code and libraries (including Vert.x modules!).
Quasar may encounter blocking constructs such as thread blocking and synchronized
blocks or methods.
There is sometimes little you can do, but this does not mean that your application will not be functional.
There are just some parts reported by Quasar where coroutines may block without being able to yield execution to another coroutine.
You may encounter exceptions that prevent your application to function, such as:
(...) io.vertx.core.VertxException: java.lang.IllegalThreadStateException: Method called not from within a fiber at co.paralleluniverse.fibers.FiberAsync.requestSync(FiberAsync.java:289) at co.paralleluniverse.fibers.FiberAsync.runSync(FiberAsync.java:255) at co.paralleluniverse.fibers.FiberAsync.run(FiberAsync.java:111) (...)
This happens when you call fiber code (e.g., a method annotated with @Suspendable
) from outside a fiber, such as from an event-loop thread.
In most of the cases the solution lies in wrapping the call to the first fiber code using one of the helper methods from Sync
: awaitResult
, awaitEvent
, fiberHandler
and streamAdaptor
.
Suppose that we have a fiber method like the following:
@Suspendable
public String readData() {
boolean exists = Sync.awaitResult(h -> vertx.fileSystem().exists("file.txt", h));
if (exists) {
Buffer buf = Sync.awaitResult(h -> vertx.fileSystem().readFile("file.txt", h));
return buf.toString();
}
return "";
}
Now suppose that we want to call this method in response to an event-bus method.
To ensure that the event-bus message processing is from a fiber and we can call the readData
method, then we need adapting with fiberHandler
:
vertx.eventBus().consumer("read", Sync.fiberHandler(m -> m.reply(readData())));
Conversely, if you do not use fiberHandler
then you will get an exception as above:
// This crashes!
vertx.eventBus().consumer("read", m -> m.reply(readData()));
Tip
|
If you need more flexibility you can always use Sync.getContextScheduler to access the verticle context scheduler and start Quasar fibers / strands.
|