Skip to main content
编辑本页

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>3.6.2</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}

Create a client

You can create a client instance as follows using a full amqp uri:

var config = RabbitMQOptions()
// full amqp uri
config.setUri("amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc")
var client = RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

var config = RabbitMQOptions()
// Each parameter is optional
// The default parameter with be used if the parameter is not set
config.setUser("user1")
config.setPassword("password1")
config.setHost("localhost")
config.setPort(5672)
config.setVirtualHost("vhost1")
config.setConnectionTimeout(6000)
config.setRequestedHeartbeat(60)
config.setHandshakeTimeout(6000)
config.setRequestedChannelMax(5)
config.setNetworkRecoveryInterval(500)
config.setAutomaticRecoveryEnabled(true)

var client = RabbitMQClient.create(vertx, config)

You can set multiples addresses to connect to a cluster;

var config = RabbitMQOptions()
config.setUser("user1")
config.setPassword("password1")
config.setVirtualHost("vhost1")

config.setAddresses(List(com.rabbitmq.client.Address.parseAddresses("firstHost,secondHost:5672")))

var client = RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

var config = new io.vertx.core.json.JsonObject()

config.put("x-dead-letter-exchange", "my.deadletter.exchange")
config.put("alternate-exchange", "my.alternate.exchange")
// ...
client.exchangeDeclareFuture("my.exchange", "fanout", true, false, config).onComplete{
  case Success(result) => {
    println("Exchange successfully declared with config")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

var config = new io.vertx.core.json.JsonObject()
config.put("x-message-ttl", 10000)

client.queueDeclareFuture("my-queue", true, false, true, config).onComplete{
  case Success(result) => {
    println("Queue declared!")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")
client.basicPublishFuture("", "my.queue", message).onComplete{
  case Success(result) => {
    println("Message published !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

var message = new io.vertx.core.json.JsonObject().put("body", "Hello RabbitMQ, from Vert.x !")

// Put the channel in confirm mode. This can be done once at init.
client.confirmSelectFuture().onComplete{
  case Success(result) => {
    client.basicPublishFuture("", "my.queue", message).onComplete{
      case Success(result) => {
        // Check the message got confirmed by the broker.
        client.waitForConfirmsFuture().onComplete{
          case Success(result) => {
            println("Message published !")}
          case Failure(cause) => {
            println(s"$cause")
          }
        }
      }
      case Failure(cause) => {
        println(s"$cause")
      }
    }
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
client.basicConsumerFuture("my.queue").onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
    var mqConsumer = result
    mqConsumer.handler((message: io.vertx.scala.rabbitmq.RabbitMQMessage) => {
      println(s"Got message: ${message.body().toString()}")
    })
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

consumer.pause()
consumer.resume()

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

var options = QueueOptions()
  .setMaxInternalQueueSize(1000)
  .setKeepMostRecent(true)


client.basicConsumerFuture("my.queue", options).onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

When you want to stop consuming message from a queue, you can do:

rabbitMQConsumer.cancelFuture().onComplete{
  case Success(result) => {
    println("Consumption successfully stopped")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

You can get notified by the end handler when the queue won’t process any more messages:

rabbitMQConsumer.endHandler((v: java.lang.Void) => {
  println("It is the end of the stream")
})

You can set the exception handler to be notified of any error that may occur when a message is processed:

consumer.exceptionHandler((e: java.lang.Throwable) => {
  println("An exception occurred in the process of message handling")
  e.printStackTrace()
})

And finally, you may want to retrive a related to the consumer tag:

var consumerTag = consumer.consumerTag()
println(s"Consumer tag is: ${consumerTag}")

Get

Will get a message from a queue

client.basicGetFuture("my.queue", true).onComplete{
  case Success(result) => {
    var msg = result
    println(s"Got message: ${msg.getValue("body")}")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Consume messages without auto-ack

// Create the event bus handler which messages will be sent to
vertx.eventBus().consumer("my.address", (msg: io.vertx.scala.core.eventbus.Message<java.lang.Object>) => {
  var json = msg.body()
  println(s"Got message: ${json.getValue("body")}")
  // ack
  client.basicAckFuture(json.getValue("deliveryTag"), false).onComplete{
    case Success(result) => println("Success")
    case Failure(cause) => println("Failure")
  }
})

// Setup the link between rabbitmq consumer and event bus address
client.basicConsumeFuture("my.queue", "my.address", false).onComplete{
  case Success(result) => {
    println("RabbitMQ consumer created !")
  }
  case Failure(cause) => {
    println(s"$cause")
  }
}

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.