A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
var config = RabbitMQOptions()
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
var config = RabbitMQOptions()
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1")
config.setPassword("password1")
config.setHost("localhost")
config.setPort(5672)
config.setVirtualHost("vhost1")
config.setConnectionTimeout(6000)
config.setRequestedHeartbeat(60)
config.setHandshakeTimeout(6000)
config.setRequestedChannelMax(5)
config.setNetworkRecoveryInterval(500)
config.setAutomaticRecoveryEnabled(true)
var client = RabbitMQClient.create(vertx, config)
You can set multiples addresses to connect to a cluster;
var config = RabbitMQOptions()
config.setUser("user1")
config.setPassword("password1")
config.setVirtualHost("vhost1")
config.setAddresses(List(com.rabbitmq.client.Address.parseAddresses("firstHost,secondHost:5672")))
var client = RabbitMQClient.create(vertx, config)
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
var config = new io.vertx.core.json.JsonObject()
config.put("x-dead-letter-exchange", "my.deadletter.exchange")
config.put("alternate-exchange", "my.alternate.exchange")
// ...
client.exchangeDeclareFuture("my.exchange", "fanout", true, false, config).onComplete{
case Success(result) => {
println("Exchange successfully declared with config")
}
case Failure(cause) => {
println(s"$cause")
}
}
You can pass additional config parameters to RabbitMQs queueDeclare method
var config = new io.vertx.core.json.JsonObject()
config.put("x-message-ttl", 10000)
client.queueDeclareFuture("my-queue", true, false, true, config).onComplete{
case Success(result) => {
println("Queue declared!")
}
case Failure(cause) => {
println(s"$cause")
}
}
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublishFuture("", "my.queue", message).onComplete{
case Success(result) => {
println("Message published !")
}
case Failure(cause) => {
println(s"$cause")
}
}
Publish a message to a queue and confirm the broker acknowledged it.
var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelectFuture().onComplete{
case Success(result) => {
client.basicPublishFuture("", "my.queue", message).onComplete{
case Success(result) => {
// Check the message got confirmed by the broker.
client.waitForConfirmsFuture().onComplete{
case Success(result) => {
println("Message published !")}
case Failure(cause) => {
println(s"$cause")
}
}
}
case Failure(cause) => {
println(s"$cause")
}
}
}
case Failure(cause) => {
println(s"$cause")
}
}
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumerFuture("my.queue").onComplete{
case Success(result) => {
println("RabbitMQ consumer created !")
var mqConsumer = result
mqConsumer.handler((message: io.vertx.scala.rabbitmq.RabbitMQMessage) => {
println(s"Got message: ${message.body().toString()}")
})
}
case Failure(cause) => {
println(s"$cause")
}
}
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause()
consumer.resume()
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
var options = QueueOptions()
.setMaxInternalQueueSize(1000)
.setKeepMostRecent(true)
client.basicConsumerFuture("my.queue", options).onComplete{
case Success(result) => {
println("RabbitMQ consumer created !")
}
case Failure(cause) => {
println(s"$cause")
}
}
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancelFuture().onComplete{
case Success(result) => {
println("Consumption successfully stopped")
}
case Failure(cause) => {
println(s"$cause")
}
}
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler((v: java.lang.Void) => {
println("It is the end of the stream")
})
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler((e: java.lang.Throwable) => {
println("An exception occurred in the process of message handling")
e.printStackTrace()
})
And finally, you may want to retrive a related to the consumer tag:
var consumerTag = consumer.consumerTag()
println(s"Consumer tag is: ${consumerTag}")
Will get a message from a queue
client.basicGetFuture("my.queue", true).onComplete{
case Success(result) => {
var msg = result
println(s"Got message: ${msg.getValue("body")}")
}
case Failure(cause) => {
println(s"$cause")
}
}
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
var json = msg.body()
println(s"Got message: ${json.getValue("body")}")
// ack
client.basicAckFuture(json.getValue("deliveryTag"), false).onComplete{
case Success(result) => println("Success")
case Failure(cause) => println("Failure")
}
})
// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address", false).onComplete{
case Success(result) => {
println("RabbitMQ consumer created !")
}
case Failure(cause) => {
println(s"$cause")
}
}
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.