A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
def config = [:]
// full amqp uri
config.uri = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
def client = RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
def config = [:]
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.user = "user1"
config.password = "password1"
config.host = "localhost"
config.port = 5672
config.virtualHost = "vhost1"
config.connectionTimeout = 6000
config.requestedHeartbeat = 60
config.handshakeTimeout = 6000
config.requestedChannelMax = 5
config.networkRecoveryInterval = 500
config.automaticRecoveryEnabled = true
def client = RabbitMQClient.create(vertx, config)
You can set multiples addresses to connect to a cluster;
def config = [:]
config.user = "user1"
config.password = "password1"
config.virtualHost = "vhost1"
config.addresses = [com.rabbitmq.client.Address.parseAddresses("firstHost,secondHost:5672")]
def client = RabbitMQClient.create(vertx, config)
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
def config = [:]
config.x-dead-letter-exchange = "my.deadletter.exchange"
config.alternate-exchange = "my.alternate.exchange"
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, { onResult ->
if (onResult.succeeded()) {
println("Exchange successfully declared with config")
} else {
onResult.cause().printStackTrace()
}
})
You can pass additional config parameters to RabbitMQs queueDeclare method
def config = [:]
config.x-message-ttl = 10000L
client.queueDeclare("my-queue", true, false, true, config, { queueResult ->
if (queueResult.succeeded()) {
println("Queue declared!")
} else {
System.err.println("Queue failed to be declared!")
queueResult.cause().printStackTrace()
}
})
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
def message = [
body:"Hello RabbitMQ, from Vert.x !"
]
client.basicPublish("", "my.queue", message, { pubResult ->
if (pubResult.succeeded()) {
println("Message published !")
} else {
pubResult.cause().printStackTrace()
}
})
Publish a message to a queue and confirm the broker acknowledged it.
def message = [
body:"Hello RabbitMQ, from Vert.x !"
]
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect({ confirmResult ->
if (confirmResult.succeeded()) {
client.basicPublish("", "my.queue", message, { pubResult ->
if (pubResult.succeeded()) {
// Check the message got confirmed by the broker.
client.waitForConfirms({ waitResult ->
if (waitResult.succeeded()) {
println("Message published !")} else {
waitResult.cause().printStackTrace()}
})
} else {
pubResult.cause().printStackTrace()
}
})
} else {
confirmResult.cause().printStackTrace()
}
})
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumer("my.queue", { rabbitMQConsumerAsyncResult ->
if (rabbitMQConsumerAsyncResult.succeeded()) {
println("RabbitMQ consumer created !")
def mqConsumer = rabbitMQConsumerAsyncResult.result()
mqConsumer.handler({ message ->
println("Got message: ${message.body().toString()}")
})
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace()
}
})
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause()
consumer.resume()
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
def options = [
maxInternalQueueSize:1000,
keepMostRecent:true
]
client.basicConsumer("my.queue", options, { rabbitMQConsumerAsyncResult ->
if (rabbitMQConsumerAsyncResult.succeeded()) {
println("RabbitMQ consumer created !")
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace()
}
})
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel({ cancelResult ->
if (cancelResult.succeeded()) {
println("Consumption successfully stopped")
} else {
println("Tired in attempt to stop consumption")
cancelResult.cause().printStackTrace()
}
})
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler({ v ->
println("It is the end of the stream")
})
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler({ e ->
println("An exception occurred in the process of message handling")
e.printStackTrace()
})
And finally, you may want to retrive a related to the consumer tag:
def consumerTag = consumer.consumerTag()
println("Consumer tag is: ${consumerTag}")
Will get a message from a queue
client.basicGet("my.queue", true, { getResult ->
if (getResult.succeeded()) {
def msg = getResult.result()
println("Got message: ${msg.body}")
} else {
getResult.cause().printStackTrace()
}
})
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", { msg ->
def json = msg.body()
println("Got message: ${json.body}")
// ack
client.basicAck(json.deliveryTag, false, { asyncResult ->
})
})
// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, { consumeResult ->
if (consumeResult.succeeded()) {
println("RabbitMQ consumer created !")
} else {
consumeResult.cause().printStackTrace()
}
})
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.