Skip to main content
编辑本页

Vertx-sync is a set of utilities that allow you to perform asynchronous operations and receive events in a synchronous way, but without blocking kernel threads.

Introduction

One of the key advantages of Vert.x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads) - this allows it to handle a lot of concurrency (e.g. handle many connections, or messages) using a very small number of kernel threads, which allows it to scale very well.

The non blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callback style, promises or Rx-style. Vert.x uses callback style in most places (although it also supports Rx).

In some cases, programming using asynchronous APIs can be more challenging than using a direct synchronous style, in particular if you have several operations that you want to do in sequence. Also error propagation is often more complex when using asynchronous APIs.

Vertx-sync allows you to work with asynchronous APIs, but using a direct synchronous style that you’re already familiar with.

It does this by using fibers. Fibers are very lightweight threads that do not correspond to underlying kernel threads. When they are blocked they do not block a kernel thread.

Vert-sync uses Quasar to implement the fibers.

Note
Vert-sync currently only works with Java.

SyncVerticle

In order to use vertx-sync you must deploy your code as instances of io.vertx.ext.sync.SyncVerticle. You should override the start() and (optionally) the stop() methods of the verticle.

Those methodsmust* be annotated with the @Suspendable annotation.

Once you’ve written your sync verticle(s) you deploy them in exactly the same way as any other verticle.

Instrumentation

Vert.x uses Quasar which implements fibers by using bytecode instrumentation. This is done at run-time using a java agent.

In order for this to work you must start the JVM specifying the java agent jar which is located in the quasar-core jar.

TODO how to reference quasar core jar in fatjar?

-javaagent:/path/to/quasar/core/quasar-core.jar

If you are using the vertx command line tools, the agent configuration can be enabled by setting the ENABLE_VERTX_SYNC_AGENT environment variable to true, before executing vertx.

You can also use a offline instrumentation as with the quasar-maven-plugin or or quasar-gradle-plugin. Check the Quasar documentation for more details.

Getting one-shot async results

Many async operations in Vert.x-land take a Handler<AsyncResult<T>> as the last argument. An example would executing a find using the Vert.x Mongo client or sending an event bus message and getting a reply.

Vertx-sync allows you to get the result of a one-shot asynchronous operation in a synchronous way.

This is done by using the Sync.awaitResult method.

The method is executed specifying the asynchronous operation that you want to execute in the form of a Consumer, the consumer is passed the handler at run-time.

Here’s an example:

EventBus eb = vertx.eventBus();

// Send a message and get the reply synchronously

Message<String> reply = awaitResult(h -> eb.send("someaddress", "ping", h));

System.out.println("Received reply " + reply.body());

In the above example the fiber is blocked until the reply is returned but no kernel thread is blocked.

Getting one-shot events

Vertx-sync can be used to get one-shot events in a synchronous way, for example firings of timers, or the executing of an end handler. This is achieved using the Sync.awaitEvent method.

Here’s an example:

long tid = awaitEvent(h -> vertx.setTimer(1000, h));

System.out.println("Timer has now fired");

Streams of events

In many places in Vert.x streams of events are provided by passing them to handlers.

Examples include event bus message consumers and HTTP server requests on an HTTP server.

Vert-sync allows you to receive events from such streams in a synchronous way.

You do this with an instance of HandlerReceiverAdaptor which implements both Handler and Receiver. You create an instance using Sync.streamAdaptor.

You can set it as a normal handler and then use the methods on Receiver to receive events synchronously.

Here’s an example using an event bus message consumer:

EventBus eb = vertx.eventBus();

HandlerReceiverAdaptor<Message<String>> adaptor = streamAdaptor();

eb.<String>consumer("some-address").handler(adaptor);

// Receive 10 messages from the consumer:
for (int i = 0; i < 10; i++) {

  Message<String> received1 = adaptor.receive();

  System.out.println("got message: " + received1.body());

}

Using a FiberHandler

If you want to do use fibers in a normal handler, e.g. in the request handler of an Http Server then you must first convert the normal handler to a fiber handler.

The fiber handler runs the normal handler on a fiber.

Here’s an example:

EventBus eb = vertx.eventBus();

vertx.createHttpServer().requestHandler(fiberHandler(req -> {

  // Send a message to address and wait for a reply
  Message<String> reply = awaitResult(h -> eb.send("some-address", "blah", h));

  System.out.println("Got reply: " + reply.body());

  // Now end the response
  req.response().end("blah");

})).listen(8080, "localhost");

Further examples

There are a set of working examples demonstrating vertx-sync in action in the examples repository

What if you get exceptions?

Quasar and co-routines do not "automagically" transform blocking code into non-blocking code. Especially, blocking using Thread.sleep or using synchronized blocks and methods is a problem.

There are 2 types of exceptions that you may observe when using vertx-sync.

Instrumentation warnings

You may encounter stack traces like the following in your logs:

(...)
[quasar] ERROR: while transforming io/vertx/core/impl/DeploymentManager$DeploymentImpl: Unable to instrument vertx/core/impl/DeploymentManager$DeploymentImpl#lambda$rollback$1(Ljava/lang/Throwable;Lio/vertx/core/impl/ContextInternal;Lio/vertx/core/Handler;/vertx/core/impl/ContextImpl;Lio/vertx/core/AsyncResult;)V because of synchronization
co.paralleluniverse.fibers.instrument.UnableToInstrumentException: Unable to instrument vertx/core/impl/DeploymentManager$DeploymentImpl#lambda$rollback$1(Ljava/lang/Throwable;Lio/vertx/core/impl/ContextInternal;Lio/vertx/core/Handler;/vertx/core/impl/ContextImpl;Lio/vertx/core/AsyncResult;)V because of synchronization
       at co.paralleluniverse.fibers.instrument.InstrumentMethod.dumpCodeBlock(InstrumentMethod.java:720)
       at co.paralleluniverse.fibers.instrument.InstrumentMethod.accept(InstrumentMethod.java:415)
       at co.paralleluniverse.fibers.instrument.InstrumentClass.visitEnd(InstrumentClass.java:265)
(...)

These errors are actually warnings from Quasar as it tries to instrument both your code and libraries (including Vert.x modules!).

Quasar may encounter blocking constructs such as thread blocking and synchronized blocks or methods. There is sometimes little you can do, but this does not mean that your application will not be functional.

There are just some parts reported by Quasar where coroutines may block without being able to yield execution to another coroutine.

Calling fiber code from outside a fiber

You may encounter exceptions that prevent your application to function, such as:

(...)
io.vertx.core.VertxException: java.lang.IllegalThreadStateException: Method called not from within a fiber
       at co.paralleluniverse.fibers.FiberAsync.requestSync(FiberAsync.java:289)
       at co.paralleluniverse.fibers.FiberAsync.runSync(FiberAsync.java:255)
       at co.paralleluniverse.fibers.FiberAsync.run(FiberAsync.java:111)
(...)

This happens when you call fiber code (e.g., a method annotated with @Suspendable) from outside a fiber, such as from an event-loop thread.

In most of the cases the solution lies in wrapping the call to the first fiber code using one of the helper methods from Sync: awaitResult, awaitEvent, fiberHandler and streamAdaptor.

Suppose that we have a fiber method like the following:

@Suspendable
public String readData() {
 boolean exists = Sync.awaitResult(h -> vertx.fileSystem().exists("file.txt", h));
 if (exists) {
   Buffer buf = Sync.awaitResult(h -> vertx.fileSystem().readFile("file.txt", h));
		 return buf.toString();
 }
 return "";
}

Now suppose that we want to call this method in response to an event-bus method. To ensure that the event-bus message processing is from a fiber and we can call the readData method, then we need adapting with fiberHandler:

vertx.eventBus().consumer("read", Sync.fiberHandler(m -> m.reply(readData())));

Conversely, if you do not use fiberHandler then you will get an exception as above:

// This crashes!
vertx.eventBus().consumer("read", m -> m.reply(readData()));
Tip
If you need more flexibility you can always use Sync.getContextScheduler to access the verticle context scheduler and start Quasar fibers / strands.