A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# full amqp uri
config['uri'] = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)
Or you can also specify individual parameters manually:
require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# Each parameter is optional
# The default parameter with be used if the parameter is not set
config['user'] = "user1"
config['password'] = "password1"
config['host'] = "localhost"
config['port'] = 5672
config['virtualHost'] = "vhost1"
config['connectionTimeout'] = 6000
config['requestedHeartbeat'] = 60
config['handshakeTimeout'] = 6000
config['requestedChannelMax'] = 5
config['networkRecoveryInterval'] = 500
config['automaticRecoveryEnabled'] = true
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)
You can set multiples addresses to connect to a cluster;
require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
config['user'] = "user1"
config['password'] = "password1"
config['virtualHost'] = "vhost1"
config['addresses'] = [Java::ComRabbitmqClient::Address.parse_addresses("firstHost,secondHost:5672")]
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
config = {
}
config['x-dead-letter-exchange'] = "my.deadletter.exchange"
config['alternate-exchange'] = "my.alternate.exchange"
# ...
client.exchange_declare("my.exchange", "fanout", true, false, config) { |onResult_err,onResult|
if (onResult_err == nil)
puts "Exchange successfully declared with config"
else
onResult_err.print_stack_trace()
end
}
You can pass additional config parameters to RabbitMQs queueDeclare method
config = {
}
config['x-message-ttl'] = 10000
client.queue_declare("my-queue", true, false, true, config) { |queueResult_err,queueResult|
if (queueResult_err == nil)
puts "Queue declared!"
else
STDERR.puts "Queue failed to be declared!"
queueResult_err.print_stack_trace()
end
}
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
message = {
'body' => "Hello RabbitMQ, from Vert.x !"
}
client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
if (pubResult_err == nil)
puts "Message published !"
else
pubResult_err.print_stack_trace()
end
}
Publish a message to a queue and confirm the broker acknowledged it.
message = {
'body' => "Hello RabbitMQ, from Vert.x !"
}
# Put the channel in confirm mode. This can be done once at init.
client.confirm_select() { |confirmResult_err,confirmResult|
if (confirmResult_err == nil)
client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
if (pubResult_err == nil)
# Check the message got confirmed by the broker.
client.wait_for_confirms() { |waitResult_err,waitResult|
if (waitResult_err == nil)
puts "Message published !"else
waitResult_err.print_stack_trace()end
}
else
pubResult_err.print_stack_trace()
end
}
else
confirmResult_err.print_stack_trace()
end
}
Consume messages from a queue.
// Create a stream of messages from a queue
client.basic_consumer("my.queue") { |rabbitMQConsumerAsyncResult_err,rabbitMQConsumerAsyncResult|
if (rabbitMQConsumerAsyncResult_err == nil)
puts "RabbitMQ consumer created !"
mqConsumer = rabbitMQConsumerAsyncResult
mqConsumer.handler() { |message|
puts "Got message: #{message.body().to_string()}"
}
else
rabbitMQConsumerAsyncResult_err.print_stack_trace()
end
}
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause()
consumer.resume()
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
options = {
'maxInternalQueueSize' => 1000,
'keepMostRecent' => true
}
client.basic_consumer("my.queue", options) { |rabbitMQConsumerAsyncResult_err,rabbitMQConsumerAsyncResult|
if (rabbitMQConsumerAsyncResult_err == nil)
puts "RabbitMQ consumer created !"
else
rabbitMQConsumerAsyncResult_err.print_stack_trace()
end
}
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel() { |cancelResult_err,cancelResult|
if (cancelResult_err == nil)
puts "Consumption successfully stopped"
else
puts "Tired in attempt to stop consumption"
cancelResult_err.print_stack_trace()
end
}
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.end_handler() { |v|
puts "It is the end of the stream"
}
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exception_handler() { |e|
puts "An exception occurred in the process of message handling"
e.print_stack_trace()
}
And finally, you may want to retrive a related to the consumer tag:
consumerTag = consumer.consumer_tag()
puts "Consumer tag is: #{consumerTag}"
Will get a message from a queue
client.basic_get("my.queue", true) { |getResult_err,getResult|
if (getResult_err == nil)
msg = getResult
puts "Got message: #{msg['body']}"
else
getResult_err.print_stack_trace()
end
}
# Create the event bus handler which messages will be sent to
vertx.event_bus().consumer("my.address") { |msg|
json = msg.body()
puts "Got message: #{json['body']}"
# ack
client.basic_ack(json['deliveryTag'], false) { |asyncResult_err,asyncResult|
}
}
# Setup the link between rabbitmq consumer and event bus address
client.basic_consume("my.queue", "my.address", false) { |consumeResult_err,consumeResult|
if (consumeResult_err == nil)
puts "RabbitMQ consumer created !"
else
consumeResult_err.print_stack_trace()
end
}
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.