The vertx-lang-kotlin-coroutines
integrates Kotlin coroutines for performing asynchronous operations and processing events.
This results in using a programming model that looks like sequential code, yet it does not block kernel threads.
One of the key advantages of Vert.x over many legacy application platforms is that it is almost entirely non-blocking (of kernel threads). This allows Vert.x-based applications to handle a lot of concurrency (e.g., many connections and messages) using a very small number of kernel threads, which in turns unlocks great scalability.
The non-blocking nature of Vert.x leads to asynchronous APIs. Asynchronous APIs can take various forms including callbacks, promises, fibers or reactive extensions. Vert.x uses the callback style for the core APIs but it also supports other models like RxJava 1 and 2.
In some cases, programming using asynchronous APIs can be more challenging than using a classic / sequential style of code, in particular with several operations need to be done in sequence. Also, error propagation is often more complex when using asynchronous APIs.
vertx-lang-kotlin-coroutines
uses coroutines. Coroutines are very lightweight threads that do not correspond
to underlying kernel threads, so that when a coroutine needs to "block" it gets suspended and frees
its current kernel thread so that another coroutine can process events.
vertx-lang-kotlin-coroutines
uses kotlinx.coroutines to implement the Coroutines.
Note
|
vertx-lang-kotlin-coroutines currently only works with Kotlin and will be out of the experimental status
in Kotlin 1.3
|
Having imported io.vertx.kotlin.coroutines.VertxCoroutine
, the GlobalScope.launch
method allows to run a block of code as a
coroutine in the "Global" application scope (bounded on the lifetime of the application):
Unresolved directive in index.adoc - include::Example.kt[tags=launchCoroutine]
The vertx.dispatcher()
returns a coroutine dispatcher that execute coroutines using the Vert.x event loop.
The awaitEvent
function suspends the execution of the coroutine until the timer fires and resumes the coroutines
with the value that was given to the handler.
More details are given in the next sections on handlers, events and stream of events.
You can deploy your code as instances of io.vertx.kotlin.coroutines.CoroutineVerticle
, a specialized type of verticle
for Kotlin coroutines. The CoroutineVerticle
class implements the kotlinx.coroutines.experimental.CoroutineScope
interface,
making all coroutines builder methods bounded by default to the verticle context. You should override the suspending start()
and (optionally) the suspending stop()
methods of the verticle:
Unresolved directive in index.adoc - include::Example.kt[tags=CoroutineVerticle]
All code examples below assume to be run inside a CoroutineVerticle
implementation, but you can replace all <builder> { .. }
calls with GlobalScope.<builder> { .. }
to use the application scope instead.
Many asynchronous operations in Vert.x take a Handler<AsyncResult<T>>
as the last argument.
An example would be executing an object retrieval using the Vert.x Mongo client, or sending an event bus message then
awaiting for a reply.
This is achieved by using the awaitResult
method which returns the value or throws an exception.
The coroutine is being suspended until the event is being processed, and no kernel thread is being blocked.
The method is executed by specifying the asynchronous operation that needs to be executed in the form of a block that is passed to the handler at run-time.
Here is an example:
Unresolved directive in index.adoc - include::Example.kt[tags=awaitResult]
When the block produces a failure, the caller can handle it as an exception using the usual exception
try
/catch
constructs:
Unresolved directive in index.adoc - include::Example.kt[tags=awaitResultFailure]
Processing a one-shot event (and not the next occurrences, if any) is achieved using the awaitEvent
function:
Unresolved directive in index.adoc - include::Example.kt[tags=awaitEvent]
Processing a blocking computation is achieved using the awaitBlocking
function:
Unresolved directive in index.adoc - include::Example.kt[tags=awaitBlocking]
In many places in Vert.x APIs, streams of events are processed through handlers. Examples include event bus message consumers and HTTP server requests.
The ReceiveChannelHandler
class allows receiving events through the (suspendable) receive
method:
Unresolved directive in index.adoc - include::Example.kt[tags=streamExample]
The await
extension method on instances of Vert.x asynchronous results suspend coroutines until they have completed, in
which case the method returns the corresponding AsyncResult<T>
object.
Unresolved directive in index.adoc - include::Example.kt[tags=awaitingFuture]
Since 3.6.0
, Vert.x generates suspending extension methods for all its asynchronous methods.
It relieves the user from using the idiomatic awaitResult
and makes the code more natural and readable.
Unresolved directive in index.adoc - include::Example.kt[tags=generatedSuspendingExtensionMethod]
Each asynchronous method has a suspending counterpart, e.g xyz(1, 2, handler)
⇒ xyzAwait(1, 2)
.
Such extensions are provided by the io.vertx:vertx-lang-kotlin
dependency and covers the Vert.x stack:
vertx-web
vertx-jdbc-client
vertx-cassandra-client
etc…
Channels are similar to Java BlockingQueue
except that they do not block and instead suspend the coroutine
instead of blocking:
sending a value to full channel suspends the coroutine
receving a value from an empty channels also suspends the coroutine
Vert.x ReadStream
and WriteStream
can be adapted to channels with the toChannel
extension method.
These adapters take care of managing the back-pressure and the stream termination
ReadStream<T>
is adapted to a ReceiveChannel<T>
WriteStream<T>
is adapted to a SendChannel<T>
Channel can be really useful when you need to handle a stream of correlated values:
Unresolved directive in index.adoc - include::Example.kt[tags=channel0]
It can also be useful for parsing protocols. We will build a non blocking HTTP request parser to show the power of channels.
We will rely on the RecordParser
to slice the stream of buffer to a stream of buffer delimited by \r\n
.
Here is the initial version of the parser, that handles only the HTTP request-line
Unresolved directive in index.adoc - include::Example.kt[tags=channel1]
Parsing the request-line is as simple as calling receive
on the channel.
The next step parses HTTP headers by receiving chunks until we get an empty one
Unresolved directive in index.adoc - include::Example.kt[tags=channel2]
Finally we terminate the parser by handling optional request bodies
Unresolved directive in index.adoc - include::Example.kt[tags=channel3]
Using a channel to send data is quite straightforward:
Unresolved directive in index.adoc - include::Example.kt[tags=sendChannel]
Both SendChannel#send
and WriteStream#write
are non blocking operations, however unlike SendChannel#send
can suspend the execution when the channel is full, the equivalent without a channel would look like
Unresolved directive in index.adoc - include::Example.kt[tags=sendChannel]
Vertx dispatcher fully supports coroutine delay
function via Vert.x timers:
Unresolved directive in index.adoc - include::Example.kt[tags=delay]
Timers support cancellation
Unresolved directive in index.adoc - include::Example.kt[tags=cancellation]
cancellation is cooperative
You can also schedule timeout with the withTimeout
function
Unresolved directive in index.adoc - include::Example.kt[tags=withTimeout]
Vert.x works will all coroutine builders, as long as an instance of CoroutineScope
is available: launch
, async
,
`produce', … . A couple of important things to remember:
The runBlocking
doesn’t need a CoroutineScope
and must not be used from a Vert.x event loop thread.
To avoid memory leaks, always use coroutineScope {..}
to define a child scope. In this way, if a coroutine fails
inside the scope, all the others, defined inside that scope, will be cancelled too.
Vert.x integration has been designed to be fully interoperable with Kotlin coroutines
kotlinx.coroutines.experimental.sync.Mutex
are executed on the event loop thread when using the vertx dispatcher
The module vertx-lang-kotlin-coroutines
does not have specific integration with RxJava however Kotlin coroutines
provide integration with RxJava, which works out nicely with vertx-lang-kotlin-coroutines
.
You can read about it in the Guide to reactive streams with coroutines