A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
var RabbitMQClient = require("vertx-rabbitmq-js/rabbit_mq_client");
var config = {
};
// full amqp uri
config.uri = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc";
var client = RabbitMQClient.create(vertx, config);
Or you can also specify individual parameters manually:
var RabbitMQClient = require("vertx-rabbitmq-js/rabbit_mq_client");
var config = {
};
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.user = "user1";
config.password = "password1";
config.host = "localhost";
config.port = 5672;
config.virtualHost = "vhost1";
config.connectionTimeout = 6000;
config.requestedHeartbeat = 60;
config.handshakeTimeout = 6000;
config.requestedChannelMax = 5;
config.networkRecoveryInterval = 500;
config.automaticRecoveryEnabled = true;
var client = RabbitMQClient.create(vertx, config);
You can set multiples addresses to connect to a cluster;
var RabbitMQClient = require("vertx-rabbitmq-js/rabbit_mq_client");
var config = {
};
config.user = "user1";
config.password = "password1";
config.virtualHost = "vhost1";
config.addresses = [Java.type("com.rabbitmq.client.Address").parseAddresses("firstHost,secondHost:5672")];
var client = RabbitMQClient.create(vertx, config);
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
var config = {
};
config.x-dead-letter-exchange = "my.deadletter.exchange";
config.alternate-exchange = "my.alternate.exchange";
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, function (onResult, onResult_err) {
if (onResult_err == null) {
console.log("Exchange successfully declared with config");
} else {
onResult_err.printStackTrace();
}
});
You can pass additional config parameters to RabbitMQs queueDeclare method
var config = {
};
config.x-message-ttl = 10000;
client.queueDeclare("my-queue", true, false, true, config, function (queueResult, queueResult_err) {
if (queueResult_err == null) {
console.log("Queue declared!");
} else {
console.error("Queue failed to be declared!");
queueResult_err.printStackTrace();
}
});
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
var message = {
"body" : "Hello RabbitMQ, from Vert.x !"
};
client.basicPublish("", "my.queue", message, function (pubResult, pubResult_err) {
if (pubResult_err == null) {
console.log("Message published !");
} else {
pubResult_err.printStackTrace();
}
});
Publish a message to a queue and confirm the broker acknowledged it.
var message = {
"body" : "Hello RabbitMQ, from Vert.x !"
};
// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect(function (confirmResult, confirmResult_err) {
if (confirmResult_err == null) {
client.basicPublish("", "my.queue", message, function (pubResult, pubResult_err) {
if (pubResult_err == null) {
// Check the message got confirmed by the broker.
client.waitForConfirms(function (waitResult, waitResult_err) {
if (waitResult_err == null) {
console.log("Message published !")} else {
waitResult_err.printStackTrace()}
});
} else {
pubResult_err.printStackTrace();
}
});
} else {
confirmResult_err.printStackTrace();
}
});
Consume messages from a queue.
// Create a stream of messages from a queue
client.basicConsumer("my.queue", function (rabbitMQConsumerAsyncResult, rabbitMQConsumerAsyncResult_err) {
if (rabbitMQConsumerAsyncResult_err == null) {
console.log("RabbitMQ consumer created !");
var mqConsumer = rabbitMQConsumerAsyncResult;
mqConsumer.handler(function (message) {
console.log("Got message: " + message.body().toString());
});
} else {
rabbitMQConsumerAsyncResult_err.printStackTrace();
}
});
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
consumer.pause();
consumer.resume();
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
var options = {
"maxInternalQueueSize" : 1000,
"keepMostRecent" : true
};
client.basicConsumer("my.queue", options, function (rabbitMQConsumerAsyncResult, rabbitMQConsumerAsyncResult_err) {
if (rabbitMQConsumerAsyncResult_err == null) {
console.log("RabbitMQ consumer created !");
} else {
rabbitMQConsumerAsyncResult_err.printStackTrace();
}
});
When you want to stop consuming message from a queue, you can do:
rabbitMQConsumer.cancel(function (cancelResult, cancelResult_err) {
if (cancelResult_err == null) {
console.log("Consumption successfully stopped");
} else {
console.log("Tired in attempt to stop consumption");
cancelResult_err.printStackTrace();
}
});
You can get notified by the end handler when the queue won’t process any more messages:
rabbitMQConsumer.endHandler(function (v) {
console.log("It is the end of the stream");
});
You can set the exception handler to be notified of any error that may occur when a message is processed:
consumer.exceptionHandler(function (e) {
console.log("An exception occurred in the process of message handling");
e.printStackTrace();
});
And finally, you may want to retrive a related to the consumer tag:
var consumerTag = consumer.consumerTag();
console.log("Consumer tag is: " + consumerTag);
Will get a message from a queue
client.basicGet("my.queue", true, function (getResult, getResult_err) {
if (getResult_err == null) {
var msg = getResult;
console.log("Got message: " + msg.body);
} else {
getResult_err.printStackTrace();
}
});
// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", function (msg) {
var json = msg.body();
console.log("Got message: " + json.body);
// ack
client.basicAck(json.deliveryTag, false, function (asyncResult, asyncResult_err) {
});
});
// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, function (consumeResult, consumeResult_err) {
if (consumeResult_err == null) {
console.log("RabbitMQ consumer created !");
} else {
consumeResult_err.printStackTrace();
}
});
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.