A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
RabbitMQOptions config = new RabbitMQOptions();
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc");
RabbitMQClient client = RabbitMQClient.create(vertx, config);
Or you can also specify individual parameters manually:
RabbitMQOptions config = new RabbitMQOptions();
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1");
config.setPassword("password1");
config.setHost("localhost");
config.setPort(5672);
config.setVirtualHost("vhost1");
config.setConnectionTimeout(6000); // in milliseconds
config.setRequestedHeartbeat(60); // in seconds
config.setHandshakeTimeout(6000); // in milliseconds
config.setRequestedChannelMax(5);
config.setNetworkRecoveryInterval(500); // in milliseconds
config.setAutomaticRecoveryEnabled(true);
RabbitMQClient client = RabbitMQClient.create(vertx, config);
You can set multiples addresses to connect to a cluster;
RabbitMQOptions config = new RabbitMQOptions();
config.setUser("user1");
config.setPassword("password1");
config.setVirtualHost("vhost1");
config.setAddresses(Arrays.asList(Address.parseAddresses("firstHost,secondHost:5672")));
RabbitMQClient client = RabbitMQClient.create(vertx, config);
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
JsonObject config = new JsonObject();
config.put("x-dead-letter-exchange", "my.deadletter.exchange");
config.put("alternate-exchange", "my.alternate.exchange");
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, onResult -> {
if (onResult.succeeded()) {
System.out.println("Exchange successfully declared with config");
} else {
onResult.cause().printStackTrace();
}
});
You can pass additional config parameters to RabbitMQs queueDeclare method
JsonObject config = new JsonObject();
config.put("x-message-ttl", 10_000L);
client.queueDeclare("my-queue", true, false, true, config, queueResult -> {
if (queueResult.succeeded()) {
System.out.println("Queue declared!");
} else {
System.err.println("Queue failed to be declared!");
queueResult.cause().printStackTrace();
}
});
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
System.out.println("Message published !");
} else {
pubResult.cause().printStackTrace();
}
});
Publish a message to a queue and confirm the broker acknowledged it.
JsonObject message = new JsonObject().put("body", "Hello RabbitMQ, from Vert.x !");
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(confirmResult -> {
if(confirmResult.succeeded()) {
client.basicPublish("", "my.queue", message, pubResult -> {
if (pubResult.succeeded()) {
// Check the message got confirmed by the broker.
client.waitForConfirms(waitResult -> {
if(waitResult.succeeded())
System.out.println("Message published !");
else
waitResult.cause().printStackTrace();
});
} else {
pubResult.cause().printStackTrace();
}
});
} else {
confirmResult.cause().printStackTrace();
}
});
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumer("my.queue", rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = rabbitMQConsumerAsyncResult.result();
mqConsumer.handler(message -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause();
consumer.resume();
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(1000)
.setKeepMostRecent(true);
client.basicConsumer("my.queue", options, rabbitMQConsumerAsyncResult -> {
if (rabbitMQConsumerAsyncResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
} else {
rabbitMQConsumerAsyncResult.cause().printStackTrace();
}
});
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel(cancelResult -> {
if (cancelResult.succeeded()) {
System.out.println("Consumption successfully stopped");
} else {
System.out.println("Tired in attempt to stop consumption");
cancelResult.cause().printStackTrace();
}
});
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler(v -> {
System.out.println("It is the end of the stream");
});
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler(e -> {
System.out.println("An exception occurred in the process of message handling");
e.printStackTrace();
});
And finally, you may want to retrive a related to the consumer tag:
String consumerTag = consumer.consumerTag();
System.out.println("Consumer tag is: " + consumerTag);
Will get a message from a queue
client.basicGet("my.queue", true, getResult -> {
if (getResult.succeeded()) {
JsonObject msg = getResult.result();
System.out.println("Got message: " + msg.getString("body"));
} else {
getResult.cause().printStackTrace();
}
});
vertx.eventBus().consumer("my.address", msg -> {
JsonObject json = (JsonObject) msg.body();
System.out.println("Got message: " + json.getString("body"));
// ack
client.basicAck(json.getLong("deliveryTag"), false, asyncResult -> {
});
});
// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, consumeResult -> {
if (consumeResult.succeeded()) {
System.out.println("RabbitMQ consumer created !");
} else {
consumeResult.cause().printStackTrace();
}
});
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.