Skip to main content
编辑本页

A Vert.x client allowing applications to interact with a RabbitMQ broker (AMQP 0.9.1)

This service is experimental and the APIs are likely to change before settling down.

Getting Started

Maven

Add the following dependency to your maven project

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-rabbitmq-client</artifactId>
 <version>3.6.2</version>
</dependency>

Gradle

Add the following dependency to your gradle project

dependencies {
 compile 'io.vertx:vertx-rabbitmq-client:3.6.2'
}

Create a client

You can create a client instance as follows using a full amqp uri:

require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# full amqp uri
config['uri'] = "amqp://xvjvsrrc:VbuL1atClKt7zVNQha0bnnScbNvGiqgb@moose.rmq.cloudamqp.com/xvjvsrrc"
client = VertxRabbitmq::RabbitMQClient.create(vertx, config)

Or you can also specify individual parameters manually:

require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
# Each parameter is optional
# The default parameter with be used if the parameter is not set
config['user'] = "user1"
config['password'] = "password1"
config['host'] = "localhost"
config['port'] = 5672
config['virtualHost'] = "vhost1"
config['connectionTimeout'] = 6000
config['requestedHeartbeat'] = 60
config['handshakeTimeout'] = 6000
config['requestedChannelMax'] = 5
config['networkRecoveryInterval'] = 500
config['automaticRecoveryEnabled'] = true

client = VertxRabbitmq::RabbitMQClient.create(vertx, config)

You can set multiples addresses to connect to a cluster;

require 'vertx-rabbitmq/rabbit_mq_client'
config = {
}
config['user'] = "user1"
config['password'] = "password1"
config['virtualHost'] = "vhost1"

config['addresses'] = [Java::ComRabbitmqClient::Address.parse_addresses("firstHost,secondHost:5672")]

client = VertxRabbitmq::RabbitMQClient.create(vertx, config)

Declare exchange with additional config

You can pass additional config parameters to RabbitMQ’s exchangeDeclare method

config = {
}

config['x-dead-letter-exchange'] = "my.deadletter.exchange"
config['alternate-exchange'] = "my.alternate.exchange"
# ...
client.exchange_declare("my.exchange", "fanout", true, false, config) { |onResult_err,onResult|
  if (onResult_err == nil)
    puts "Exchange successfully declared with config"
  else
    onResult_err.print_stack_trace()
  end
}

Declare queue with additional config

You can pass additional config parameters to RabbitMQs queueDeclare method

config = {
}
config['x-message-ttl'] = 10000

client.queue_declare("my-queue", true, false, true, config) { |queueResult_err,queueResult|
  if (queueResult_err == nil)
    puts "Queue declared!"
  else
    STDERR.puts "Queue failed to be declared!"
    queueResult_err.print_stack_trace()
  end
}

Operations

The following are some examples of the operations supported by the RabbitMQService API. Consult the javadoc/documentation for detailed information on all API methods.

Publish

Publish a message to a queue

message = {
  'body' => "Hello RabbitMQ, from Vert.x !"
}
client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
  if (pubResult_err == nil)
    puts "Message published !"
  else
    pubResult_err.print_stack_trace()
  end
}

Publish with confirm

Publish a message to a queue and confirm the broker acknowledged it.

message = {
  'body' => "Hello RabbitMQ, from Vert.x !"
}

# Put the channel in confirm mode. This can be done once at init.
client.confirm_select() { |confirmResult_err,confirmResult|
  if (confirmResult_err == nil)
    client.basic_publish("", "my.queue", message) { |pubResult_err,pubResult|
      if (pubResult_err == nil)
        # Check the message got confirmed by the broker.
        client.wait_for_confirms() { |waitResult_err,waitResult|
          if (waitResult_err == nil)
            puts "Message published !"else
            waitResult_err.print_stack_trace()end
        }
      else
        pubResult_err.print_stack_trace()
      end
    }
  else
    confirmResult_err.print_stack_trace()
  end
}

Consume

Consume messages from a queue.

// Create a stream of messages from a queue
client.basic_consumer("my.queue") { |rabbitMQConsumerAsyncResult_err,rabbitMQConsumerAsyncResult|
  if (rabbitMQConsumerAsyncResult_err == nil)
    puts "RabbitMQ consumer created !"
    mqConsumer = rabbitMQConsumerAsyncResult
    mqConsumer.handler() { |message|
      puts "Got message: #{message.body().to_string()}"
    }
  else
    rabbitMQConsumerAsyncResult_err.print_stack_trace()
  end
}

At any moment of time you can pause or resume the stream. When stream is paused you won’t receive any message.

consumer.pause()
consumer.resume()

There are actually a set of options to specify when creating a consumption stream.

The QueueOptions lets you specify:

  • The size of internal queue with setMaxInternalQueueSize

  • Should the stream keep more recent messages when queue size is exceed with setKeepMostRecent

options = {
  'maxInternalQueueSize' => 1000,
  'keepMostRecent' => true
}

client.basic_consumer("my.queue", options) { |rabbitMQConsumerAsyncResult_err,rabbitMQConsumerAsyncResult|
  if (rabbitMQConsumerAsyncResult_err == nil)
    puts "RabbitMQ consumer created !"
  else
    rabbitMQConsumerAsyncResult_err.print_stack_trace()
  end
}

When you want to stop consuming message from a queue, you can do:

rabbitMQConsumer.cancel() { |cancelResult_err,cancelResult|
  if (cancelResult_err == nil)
    puts "Consumption successfully stopped"
  else
    puts "Tired in attempt to stop consumption"
    cancelResult_err.print_stack_trace()
  end
}

You can get notified by the end handler when the queue won’t process any more messages:

rabbitMQConsumer.end_handler() { |v|
  puts "It is the end of the stream"
}

You can set the exception handler to be notified of any error that may occur when a message is processed:

consumer.exception_handler() { |e|
  puts "An exception occurred in the process of message handling"
  e.print_stack_trace()
}

And finally, you may want to retrive a related to the consumer tag:

consumerTag = consumer.consumer_tag()
puts "Consumer tag is: #{consumerTag}"

Get

Will get a message from a queue

client.basic_get("my.queue", true) { |getResult_err,getResult|
  if (getResult_err == nil)
    msg = getResult
    puts "Got message: #{msg['body']}"
  else
    getResult_err.print_stack_trace()
  end
}

Consume messages without auto-ack

# Create the event bus handler which messages will be sent to
vertx.event_bus().consumer("my.address") { |msg|
  json = msg.body()
  puts "Got message: #{json['body']}"
  # ack
  client.basic_ack(json['deliveryTag'], false) { |asyncResult_err,asyncResult|
  }
}

# Setup the link between rabbitmq consumer and event bus address
client.basic_consume("my.queue", "my.address", false) { |consumeResult_err,consumeResult|
  if (consumeResult_err == nil)
    puts "RabbitMQ consumer created !"
  else
    consumeResult_err.print_stack_trace()
  end
}

Running the tests

You will need to have RabbitMQ installed and running with default ports on localhost for this to work.