Skip to main content
编辑本页

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>3.6.2</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}

Create a client

You can create a client instance as follows using a full amqp uri:

<?php
    use io\vertx\jphp\rabbitmq\RabbitMQClient;
    $config = array(
    );
    // full amqp uri
    $config["uri"] = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc";
    $client = RabbitMQClient::create($vertx, $config);

Or you can also specify individual parameters manually:

<?php
    use io\vertx\jphp\rabbitmq\RabbitMQClient;
    $config = array(
    );
    // Each parameter is optional
    // The default parameter with be used if the parameter is not set
    $config["user"] = "user1";
    $config["password"] = "password1";
    $config["host"] = "localhost";
    $config["port"] = 5672;
    $config["virtualHost"] = "vhost1";
    $config["connectionTimeout"] = 6000;
    $config["requestedHeartbeat"] = 60;
    $config["handshakeTimeout"] = 6000;
    $config["requestedChannelMax"] = 5;
    $config["networkRecoveryInterval"] = 500;
    $config["automaticRecoveryEnabled"] = true;

    $client = RabbitMQClient::create($vertx, $config);

You can set multiples addresses to connect to a cluster;

<?php
    use io\vertx\jphp\rabbitmq\RabbitMQClient;
    $config = array(
    );
    $config["user"] = "user1";
    $config["password"] = "password1";
    $config["virtualHost"] = "vhost1";

    $config["addresses"] = array( Java::type("com.rabbitmq.client.Address")->parseAddresses("firstHost,secondHost:5672"));

    $client = RabbitMQClient::create($vertx, $config);

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

<?php

    $config = array(
    );

    $config["x-dead-letter-exchange"] = "my.deadletter.exchange";
    $config["alternate-exchange"] = "my.alternate.exchange";
    // ...
    $client->exchangeDeclare("my.exchange", "fanout", true, false, $config, function ($onResult, $onResult_err) {
        if ($onResult != null) {
            echo "Exchange successfully declared with config\n";
        } else {
            $onResult_err->printStackTrace();
        };
    });

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

<?php
    $config = array(
    );
    $config["x-message-ttl"] = 10000;

    $client->queueDeclare("my-queue", true, false, true, $config, function ($queueResult, $queueResult_err) {
        if ($queueResult != null) {
            echo "Queue declared!\n";
        } else {
            echo "Queue failed to be declared!\n";
            $queueResult_err->printStackTrace();
        };
    });

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

<?php
    $message = array(
        "body" => "Hello RabbitMQ, from Vert.x !"
    );
    $client->basicPublish("", "my.queue", $message, function ($pubResult, $pubResult_err) {
        if ($pubResult != null) {
            echo "Message published !\n";
        } else {
            $pubResult_err->printStackTrace();
        };
    });

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

<?php
    $message = array(
        "body" => "Hello RabbitMQ, from Vert.x !"
    );

    // Put the channel in confirm mode. This can be done once at init.
    $client->confirmSelect(function ($confirmResult, $confirmResult_err) {
        if ($confirmResult != null) {
            $client->basicPublish("", "my.queue", $message, function ($pubResult, $pubResult_err) {
                if ($pubResult != null) {
                    // Check the message got confirmed by the broker.
                    $client->waitForConfirms(function ($waitResult, $waitResult_err) {
                        if ($waitResult != null) {
                            echo "Message published !\n"} else {
                            $waitResult_err->printStackTrace()};
                    });
                } else {
                    $pubResult_err->printStackTrace();
                };
            });
        } else {
            $confirmResult_err->printStackTrace();
        };
    });

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
<?php
    $client->basicConsumer("my.queue", function ($rabbitMQConsumerAsyncResult, $rabbitMQConsumerAsyncResult_err) {
        if ($rabbitMQConsumerAsyncResult != null) {
            echo "RabbitMQ consumer created !\n";
            $mqConsumer = $rabbitMQConsumerAsyncResult;
            $mqConsumer->handler(function ($message) {
                echo "Got message: ".$message->body()->toString()."\n";
            });
        } else {
            $rabbitMQConsumerAsyncResult_err->printStackTrace();
        };
    });

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

<?php
    $consumer->pause();
    $consumer->resume();

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

<?php
    $options = array(
        "maxInternalQueueSize" => 1000,
        "keepMostRecent" => true
    );

    $client->basicConsumer("my.queue", $options, function ($rabbitMQConsumerAsyncResult, $rabbitMQConsumerAsyncResult_err) {
        if ($rabbitMQConsumerAsyncResult != null) {
            echo "RabbitMQ consumer created !\n";
        } else {
            $rabbitMQConsumerAsyncResult_err->printStackTrace();
        };
    });

When you want to stop consuming message from a queue, you can do:

<?php
    $rabbitMQConsumer->cancel(function ($cancelResult, $cancelResult_err) {
        if ($cancelResult != null) {
            echo "Consumption successfully stopped\n";
        } else {
            echo "Tired in attempt to stop consumption\n";
            $cancelResult_err->printStackTrace();
        };
    });

You can get notified by the end handler when the queue won’t process any more messages:

<?php
    $rabbitMQConsumer->endHandler(function ($v) {
        echo "It is the end of the stream\n";
    });

You can set the exception handler to be notified of any error that may occur when a message is processed:

<?php
    $consumer->exceptionHandler(function ($e) {
        echo "An exception occurred in the process of message handling\n";
        $e->printStackTrace();
    });

And finally, you may want to retrive a related to the consumer tag:

<?php
    $consumerTag = $consumer->consumerTag();
    echo "Consumer tag is: ".$consumerTag."\n";

Get

Will get a message from a queue

<?php
    $client->basicGet("my.queue", true, function ($getResult, $getResult_err) {
        if ($getResult != null) {
            $msg = $getResult;
            echo "Got message: .$msg["body"]"."\n";
        } else {
            $getResult_err->printStackTrace();
        };
    });

Consume messages without auto-ack

<?php
    // Create the event bus handler which messages will be sent to
    $vertx->eventBus()->consumer("my.address", function ($msg) {
        $json = $msg->body();
        echo "Got message: .$json["body"]"."\n";
        // ack
        $client->basicAck($json["deliveryTag"], false, function ($asyncResult, $asyncResult_err) {
        });
    });

    // Setup the link between rabbitmq consumer and event bus address
    $client->basicConsume("my.queue", "my.address", false, function ($consumeResult, $consumeResult_err) {
        if ($consumeResult != null) {
            echo "RabbitMQ consumer created !\n";
        } else {
            $consumeResult_err->printStackTrace();
        };
    });

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.