A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)
This service is experimental and the APIs are likely to change before settling down.
Add the following dependency to your maven project
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rabbitmq-client</artifactId>
<version>3.6.2</version>
</dependency>
Add the following dependency to your gradle project
dependencies {
compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}
You can create a client instance as follows using a full amqp uri:
<?php
use io\vertx\jphp\rabbitmq\RabbitMQClient;
$config = array(
);
// full amqp uri
$config["uri"] = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc";
$client = RabbitMQClient::create($vertx, $config);
Or you can also specify individual parameters manually:
<?php
use io\vertx\jphp\rabbitmq\RabbitMQClient;
$config = array(
);
// Each parameter is optional
// The default parameter with be used if the parameter is not set
$config["user"] = "user1";
$config["password"] = "password1";
$config["host"] = "localhost";
$config["port"] = 5672;
$config["virtualHost"] = "vhost1";
$config["connectionTimeout"] = 6000;
$config["requestedHeartbeat"] = 60;
$config["handshakeTimeout"] = 6000;
$config["requestedChannelMax"] = 5;
$config["networkRecoveryInterval"] = 500;
$config["automaticRecoveryEnabled"] = true;
$client = RabbitMQClient::create($vertx, $config);
You can set multiples addresses to connect to a cluster;
<?php
use io\vertx\jphp\rabbitmq\RabbitMQClient;
$config = array(
);
$config["user"] = "user1";
$config["password"] = "password1";
$config["virtualHost"] = "vhost1";
$config["addresses"] = array( Java::type("com.rabbitmq.client.Address")->parseAddresses("firstHost,secondHost:5672"));
$client = RabbitMQClient::create($vertx, $config);
You can pass additional config parameters to RabbitMQ’s exchangeDeclare method
<?php
$config = array(
);
$config["x-dead-letter-exchange"] = "my.deadletter.exchange";
$config["alternate-exchange"] = "my.alternate.exchange";
// ...
$client->exchangeDeclare("my.exchange", "fanout", true, false, $config, function ($onResult, $onResult_err) {
if ($onResult != null) {
echo "Exchange successfully declared with config\n";
} else {
$onResult_err->printStackTrace();
};
});
You can pass additional config parameters to RabbitMQs queueDeclare method
<?php
$config = array(
);
$config["x-message-ttl"] = 10000;
$client->queueDeclare("my-queue", true, false, true, $config, function ($queueResult, $queueResult_err) {
if ($queueResult != null) {
echo "Queue declared!\n";
} else {
echo "Queue failed to be declared!\n";
$queueResult_err->printStackTrace();
};
});
The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.
Publish a message to a queue
<?php
$message = array(
"body" => "Hello RabbitMQ, from Vert.x !"
);
$client->basicPublish("", "my.queue", $message, function ($pubResult, $pubResult_err) {
if ($pubResult != null) {
echo "Message published !\n";
} else {
$pubResult_err->printStackTrace();
};
});
Publish a message to a queue and confirm the broker acknowledged it.
<?php
$message = array(
"body" => "Hello RabbitMQ, from Vert.x !"
);
// Put the channel in confirm mode. This can be done once at init.
$client->confirmSelect(function ($confirmResult, $confirmResult_err) {
if ($confirmResult != null) {
$client->basicPublish("", "my.queue", $message, function ($pubResult, $pubResult_err) {
if ($pubResult != null) {
// Check the message got confirmed by the broker.
$client->waitForConfirms(function ($waitResult, $waitResult_err) {
if ($waitResult != null) {
echo "Message published !\n"} else {
$waitResult_err->printStackTrace()};
});
} else {
$pubResult_err->printStackTrace();
};
});
} else {
$confirmResult_err->printStackTrace();
};
});
Consume messages from a queue.
// Create a stream of messages from a queue
<?php
$client->basicConsumer("my.queue", function ($rabbitMQConsumerAsyncResult, $rabbitMQConsumerAsyncResult_err) {
if ($rabbitMQConsumerAsyncResult != null) {
echo "RabbitMQ consumer created !\n";
$mqConsumer = $rabbitMQConsumerAsyncResult;
$mqConsumer->handler(function ($message) {
echo "Got message: ".$message->body()->toString()."\n";
});
} else {
$rabbitMQConsumerAsyncResult_err->printStackTrace();
};
});
At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.
<?php
$consumer->pause();
$consumer->resume();
There are actually a set of options to specify when creating a consumption stream.
The QueueOptions
lets you specify:
The size of internal queue with setMaxInternalQueueSize
Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent
<?php
$options = array(
"maxInternalQueueSize" => 1000,
"keepMostRecent" => true
);
$client->basicConsumer("my.queue", $options, function ($rabbitMQConsumerAsyncResult, $rabbitMQConsumerAsyncResult_err) {
if ($rabbitMQConsumerAsyncResult != null) {
echo "RabbitMQ consumer created !\n";
} else {
$rabbitMQConsumerAsyncResult_err->printStackTrace();
};
});
When you want to stop consuming message from a queue, you can do:
<?php
$rabbitMQConsumer->cancel(function ($cancelResult, $cancelResult_err) {
if ($cancelResult != null) {
echo "Consumption successfully stopped\n";
} else {
echo "Tired in attempt to stop consumption\n";
$cancelResult_err->printStackTrace();
};
});
You can get notified by the end handler when the queue won’t process any more messages:
<?php
$rabbitMQConsumer->endHandler(function ($v) {
echo "It is the end of the stream\n";
});
You can set the exception handler to be notified of any error that may occur when a message is processed:
<?php
$consumer->exceptionHandler(function ($e) {
echo "An exception occurred in the process of message handling\n";
$e->printStackTrace();
});
And finally, you may want to retrive a related to the consumer tag:
<?php
$consumerTag = $consumer->consumerTag();
echo "Consumer tag is: ".$consumerTag."\n";
Will get a message from a queue
<?php
$client->basicGet("my.queue", true, function ($getResult, $getResult_err) {
if ($getResult != null) {
$msg = $getResult;
echo "Got message: .$msg["body"]"."\n";
} else {
$getResult_err->printStackTrace();
};
});
<?php
// Create the event bus handler which messages will be sent to
$vertx->eventBus()->consumer("my.address", function ($msg) {
$json = $msg->body();
echo "Got message: .$json["body"]"."\n";
// ack
$client->basicAck($json["deliveryTag"], false, function ($asyncResult, $asyncResult_err) {
});
});
// Setup the link between rabbitmq consumer and event bus address
$client->basicConsume("my.queue", "my.address", false, function ($consumeResult, $consumeResult_err) {
if ($consumeResult != null) {
echo "RabbitMQ consumer created !\n";
} else {
$consumeResult_err->printStackTrace();
};
});
You will need to have RabbitMQ installed and running with default ports on localhost for this to work.