Skip to main content
编辑本页

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>3.6.2</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}

Create a client

You can create a client instance as follows using a full amqp uri:

def config = [:]
// full amqp uri
config.uri = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
def client = RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

def config = [:]
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.user = "user1"
config.password = "password1"
config.host = "localhost"
config.port = 5672
config.virtualHost = "vhost1"
config.connectionTimeout = 6000
config.requestedHeartbeat = 60
config.handshakeTimeout = 6000
config.requestedChannelMax = 5
config.networkRecoveryInterval = 500
config.automaticRecoveryEnabled = true

def client = RabbitMQClient.create(vertx, config)

You can set multiples addresses to connect to a cluster;

def config = [:]
config.user = "user1"
config.password = "password1"
config.virtualHost = "vhost1"

config.addresses = [com.rabbitmq.client.Address.parseAddresses("firstHost,secondHost:5672")]

def client = RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

def config = [:]

config.x-dead-letter-exchange = "my.deadletter.exchange"
config.alternate-exchange = "my.alternate.exchange"
// ...
client.exchangeDeclare("my.exchange", "fanout", true, false, config, { onResult ->
  if (onResult.succeeded()) {
    println("Exchange successfully declared with config")
  } else {
    onResult.cause().printStackTrace()
  }
})

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

def config = [:]
config.x-message-ttl = 10000L

client.queueDeclare("my-queue", true, false, true, config, { queueResult ->
  if (queueResult.succeeded()) {
    println("Queue declared!")
  } else {
    System.err.println("Queue failed to be declared!")
    queueResult.cause().printStackTrace()
  }
})

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

def message = [
  body:"Hello RabbitMQ, from Vert.x !"
]
client.basicPublish("", "my.queue", message, { pubResult ->
  if (pubResult.succeeded()) {
    println("Message published !")
  } else {
    pubResult.cause().printStackTrace()
  }
})

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

def message = [
  body:"Hello RabbitMQ, from Vert.x !"
]

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelect({ confirmResult ->
  if (confirmResult.succeeded()) {
    client.basicPublish("", "my.queue", message, { pubResult ->
      if (pubResult.succeeded()) {
        // Check the message got confirmed by the broker.
        client.waitForConfirms({ waitResult ->
          if (waitResult.succeeded()) {
            println("Message published !")} else {
            waitResult.cause().printStackTrace()}
        })
      } else {
        pubResult.cause().printStackTrace()
      }
    })
  } else {
    confirmResult.cause().printStackTrace()
  }
})

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
client.basicConsumer("my.queue", { rabbitMQConsumerAsyncResult ->
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    println("RabbitMQ consumer created !")
    def mqConsumer = rabbitMQConsumerAsyncResult.result()
    mqConsumer.handler({ message ->
      println("Got message: ${message.body().toString()}")
    })
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace()
  }
})

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

consumer.pause()
consumer.resume()

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

def options = [
  maxInternalQueueSize:1000,
  keepMostRecent:true
]

client.basicConsumer("my.queue", options, { rabbitMQConsumerAsyncResult ->
  if (rabbitMQConsumerAsyncResult.succeeded()) {
    println("RabbitMQ consumer created !")
  } else {
    rabbitMQConsumerAsyncResult.cause().printStackTrace()
  }
})

When you want to stop consuming message from a queue, you can do:

rabbitMQConsumer.cancel({ cancelResult ->
  if (cancelResult.succeeded()) {
    println("Consumption successfully stopped")
  } else {
    println("Tired in attempt to stop consumption")
    cancelResult.cause().printStackTrace()
  }
})

You can get notified by the end handler when the queue won’t process any more messages:

rabbitMQConsumer.endHandler({ v ->
  println("It is the end of the stream")
})

You can set the exception handler to be notified of any error that may occur when a message is processed:

consumer.exceptionHandler({ e ->
  println("An exception occurred in the process of message handling")
  e.printStackTrace()
})

And finally, you may want to retrive a related to the consumer tag:

def consumerTag = consumer.consumerTag()
println("Consumer tag is: ${consumerTag}")

Get

Will get a message from a queue

client.basicGet("my.queue", true, { getResult ->
  if (getResult.succeeded()) {
    def msg = getResult.result()
    println("Got message: ${msg.body}")
  } else {
    getResult.cause().printStackTrace()
  }
})

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", { msg ->
  def json = msg.body()
  println("Got message: ${json.body}")
  // ack
  client.basicAck(json.deliveryTag, false, { asyncResult ->
  })
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsume("my.queue", "my.address", false, { consumeResult ->
  if (consumeResult.succeeded()) {
    println("RabbitMQ consumer created !")
  } else {
    consumeResult.cause().printStackTrace()
  }
})

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.