Skip to main content
编辑本页

此组件提供了 Kafka Client 的集成,可以以 Vert.x 的方式从 Apache Kafka 集群上消费或者发送消息。

对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition), 或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。

对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。

Warning
此组件处于技术预览阶段,因此在之后版本中API可能还会发生一些变更。

使用 Vert.x Kafka Client

要使用 Vert.x Kafka Client 组件,需要添加以下依赖:

  • Maven (在 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-kafka-client</artifactId>
 <version>3.6.2</version>
</dependency>
  • Gradle (在 build.gradle 文件中):

compile io.vertx:vertx-kafka-client:3.6.2

创建 Kafka Client

创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。

我们需要对 Consumer 与 Producer 进行一些相关的配置,具体可以参考 Apache Kafka 的官方文档: consumerproducer.

我们可以通过一个 Map 来包装这些配置,然后将其传入到 KafkaConsumer 接口或 KafkaProducer 接口中的 create 静态方法里来创建 KafkaConsumerKafkaProducer

// creating the consumer using map config
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// use consumer for interacting with Apache Kafka
def consumer = KafkaConsumer.create(vertx, config)

在上面的例子中,我们在创建 KafkaConsumer 实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。

我们可以用类似的方法来创建 Producer:

// creating the producer using map and class types for key and value serializers/deserializers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["acks"] = "1"

// use producer for interacting with Apache Kafka
def producer = KafkaProducer.create(vertx, config)

消费感兴趣 Topic 的消息并加入消费组

我们可以通过 KafkaConsumersubscribe 方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。

当然你需要通过 handler 方法注册一个 Handler 来处理接收的消息:

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// subscribe to several topics
def topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics)

// or just subscribe to a single topic
consumer.subscribe("a-single-topic")

The handler can be registered before or after the call to subscribe(); messages won’t be consumed until both methods have been called. This allows you to call subscribe(), then seek() and finally handler() in order to only consume messages starting from a particular offset, for example.

另外如果想知道消息是否成功被消费掉,可以在调用 subscribe 方法时绑定一个 Handler

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// subscribe to several topics
def topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics, { ar ->
  if (ar.succeeded()) {
    println("subscribed")
  } else {
    println("Could not subscribe ${ar.cause().getMessage()}")
  }
})

// or just subscribe to a single topic
consumer.subscribe("a-single-topic", { ar ->
  if (ar.succeeded()) {
    println("subscribed")
  } else {
    println("Could not subscribe ${ar.cause().getMessage()}")
  }
})

由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition, 同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。

如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。

您可以通过 partitionsRevokedHandlerpartitionsAssignedHandler 方法在 KafkaConsumer 里注册一个 Handler 用于监听对应的 partition 是否被删除或者分配。

// register the handler for incoming messages
consumer.handler({ record ->
  println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler({ topicPartitions ->

  println("Partitions assigned")
  topicPartitions.each { topicPartition ->
    println("${topicPartition.topic} ${topicPartition.partition}")
  }
})

consumer.partitionsRevokedHandler({ topicPartitions ->

  println("Partitions revoked")
  topicPartitions.each { topicPartition ->
    println("${topicPartition.topic} ${topicPartition.partition}")
  }
})

// subscribes to the topic
consumer.subscribe("test", { ar ->

  if (ar.succeeded()) {
    println("Consumer subscribed")
  }
})

加入某个 consumer group 的消费者,可以通过 unsubscribe 方法退出该消费组,从而不再接受到相关消息:

// consumer is already member of a consumer group

// unsubscribing request
consumer.unsubscribe()

当然你也可以在 unsubscribe 方法中传入一个 Handler 用于监听执行结果状态:

// consumer is already member of a consumer group

// unsubscribing request
consumer.unsubscribe({ ar ->

  if (ar.succeeded()) {
    println("Consumer unsubscribed")
  }
})

从 Topic 的特定分区里接收消息

消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那么整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。

您可以通过 assign 方法请求分配指定的分区:

// register the handler for incoming messages
consumer.handler({ record ->
  println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})

//
def topicPartitions = new java.util.HashSet()
topicPartitions.add([
  topic:"test",
  partition:0
])

// requesting to be assigned the specific partition
consumer.assign(topicPartitions, { done ->

  if (done.succeeded()) {
    println("Partition assigned")

    // requesting the assigned partitions
    consumer.assignment({ done1 ->

      if (done1.succeeded()) {

        done1.result().each { topicPartition ->
          println("${topicPartition.topic} ${topicPartition.partition}")
        }
      }
    })
  }
})

As with subscribe(), the handler can be registered before or after the call to assign(); messages won’t be consumed until both methods have been called. This allows you to call assign(), then seek() and finally handler() in order to only consume messages starting from a particular offset, for example.

上面的 assignment 方法可以列出当前分配的 topic partition。

Receiving messages with explicit polling

Other than using the internal polling mechanism in order to receive messages from Kafka, the client can subscribe to a topic, avoiding to register the handler for getting the messages and then using the poll method.

In this way, the user application is in charge to execute the poll for getting messages when it needs, for example after processing the previous ones.

// subscribes to the topic
consumer.subscribe("test", { ar ->

  if (ar.succeeded()) {
    println("Consumer subscribed")

    vertx.setPeriodic(1000, { timerId ->

      consumer.poll(100, { ar1 ->

        if (ar1.succeeded()) {

          def records = ar1.result()
          (0..<records.size()).each { i ->
            def record = records.recordAt(i)
            println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
          }
        }
      })

    })
  }
})

After subscribing successfully, the application start a periodic timer in order to execute the poll and getting messages from Kafka periodically.

Changing the subscription or assignment

You can change the subscribed topics, or assigned partitions after you have started to consume messages, simply by calling subscribe() or assign() again.

Note that due to internal buffering of messages it is possible that the record handler will continue to observe messages from the old subscription or assignment after the subscribe() or assign() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the completion handler has been called it will only observe messages read from the subscription or assignment.

获取 Topic 以及分区信息

您可以通过 partitionsFor 方法获取指定 topic 的 partition 信息:

// asking partitions information about specific topic
consumer.partitionsFor("test", { ar ->

  if (ar.succeeded()) {

    ar.result().each { partitionInfo ->
      println(partitionInfo)
    }
  }
})

另外, listTopics 方法可以列出消费者下的所有 topic 以及对应的 partition 信息:

// asking information about available topics and related partitions
consumer.listTopics({ ar ->

  if (ar.succeeded()) {

    def map = ar.result()
    map.each { topic, partitions ->
      println("topic = ${topic}")
      println("partitions = ${map[topic]}")
    }
  }
})

手动提交偏移量

在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。

Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer 时将 enable.auto.commit 配置项设为 true 来开启自动提交。

我们可以通过 commit 方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。

// consumer is processing read messages

// committing offset of the last read message
consumer.commit({ ar ->

  if (ar.succeeded()) {
    println("Last read message offset committed")
  }
})

分区偏移量定位

Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作, 并从任意指定的 topic 以及 partition 位置开始消费消息。

我们可以通过 seek 方法来更改读取位置对应的偏移量:

def topicPartition = [
  topic:"test",
  partition:0
]

// seek to a specific offset
consumer.seek(topicPartition, 10, { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

当消费者需要从 Stream 的起始位置读取消息时,可以使用 seekToBeginning 方法将 offset 位置设置到 partition 的起始端:

def topicPartition = [
  topic:"test",
  partition:0
]

// seek to the beginning of the partition
consumer.seekToBeginning(java.util.Collections.singleton(topicPartition), { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

最后我们也可以通过 seekToEnd 方法将 offset 位置设置到 partition 的末端:

def topicPartition = [
  topic:"test",
  partition:0
]

// seek to the end of the partition
consumer.seekToEnd(java.util.Collections.singleton(topicPartition), { done ->

  if (done.succeeded()) {
    println("Seeking done")
  }
})

Note that due to internal buffering of messages it is possible that the record handler will continue to observe messages read from the original offset for a time after the seek*() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the seek*() completion handler has been called it will only observe messages read from the new offset.

偏移量查询

你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets 来获取给定分区的起始偏移量。这个跟上面的 seekToBeginning 方法有一个地方不同: beginningOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

def topicPartitions = new java.util.HashSet()
def topicPartition = [
  topic:"test",
  partition:0
]
topicPartitions.add(topicPartition)

consumer.beginningOffsets(topicPartitions, { done ->
  if (done.succeeded()) {
    def results = done.result()
    results.each { topic, beginningOffset ->
      println("Beginning offset for topic=${topic.topic}, partition=${topic.partition}, beginningOffset=${beginningOffset}")
    }
  }
})

// Convenience method for single-partition lookup
consumer.beginningOffsets(topicPartition, { done ->
  if (done.succeeded()) {
    def beginningOffset = done.result()
    println("Beginning offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, beginningOffset=${beginningOffset}")
  }
})

与此对应的API还有 endOffsets 方法,用于获取给定分区末端的偏移量值。与 seekToEnd 方法相比, endOffsets 方法不会更改 offset 的值,仅仅是读取(只读模式)。

def topicPartitions = new java.util.HashSet()
def topicPartition = [
  topic:"test",
  partition:0
]
topicPartitions.add(topicPartition)

consumer.endOffsets(topicPartitions, { done ->
  if (done.succeeded()) {
    def results = done.result()
    results.each { topic, endOffset ->
      println("End offset for topic=${topic.topic}, partition=${topic.partition}, endOffset=${endOffset}")
    }
  }
})

// Convenience method for single-partition lookup
consumer.endOffsets(topicPartition, { done ->
  if (done.succeeded()) {
    def endOffset = done.result()
    println("End offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, endOffset=${endOffset}")
  }
})

Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。

Code not translatable

流量控制

Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause)消息的流入(这里实际上是把消息全部缓存到内存里了); 等我们处理了差不多了,可以再继续消费缓存起来的消息(resume)。

我们可以利用 pause 方法和 resume 方法来进行流量控制:

In the case of the partition-specific pause and resume it is possible that the record handler will continue to observe messages from a paused partition for a time after the pause() method’s completion handler has been called. This is not the case for messages observed by the batch handler: Once the pause() completion handler has been called it will only observe messages from those partitions which rare not paused.

def topicPartition = [
  topic:"test",
  partition:0
]

// registering the handler for incoming messages
consumer.handler({ record ->
  println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")

  // i.e. pause/resume on partition 0, after reading message up to offset 5
  if ((record.partition() == 0) && (record.offset() == 5)) {

    // pause the read operations
    consumer.pause(topicPartition, { ar ->

      if (ar.succeeded()) {

        println("Paused")

        // resume read operation after a specific time
        vertx.setTimer(5000, { timeId ->

          // resume read operations
          consumer.resume(topicPartition)
        })
      }
    })
  }
})

关闭 Consumer

关闭 Consumer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。

由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。

当关闭操作彻底完成以后,注册的 Handler 将会被调用。

consumer.close({ res ->
  if (res.succeeded()) {
    println("Consumer is now closed")
  } else {
    println("close failed")
  }
})

发送消息到某个 Topic

您可以利用 write 方法来向某个 topic 发送消息(records)。

最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下, 消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。

(0..<5).each { i ->

  // only topic and message value are specified, round robin on destination partitions
  def record = KafkaProducerRecord.create("test", "message_${i}")

  producer.write(record)
}

您可以通过绑定 Handler 来接受发送的结果。这个结果其实就是一些元数据(metadata), 包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。

(0..<5).each { i ->

  // only topic and message value are specified, round robin on destination partitions
  def record = KafkaProducerRecord.create("test", "message_${i}")

  producer.write(record, { done ->

    if (done.succeeded()) {

      def recordMetadata = done.result()
      println("Message ${record.value()} written on topic=${recordMetadata.topic}, partition=${recordMetadata.partition}, offset=${recordMetadata.offset}")
    }

  })
}

如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:

(0..<10).each { i ->

  // a destination partition is specified
  def record = KafkaProducerRecord.create("test", null, "message_${i}", 0)

  producer.write(record)
}

因为 Producer 可以使用消息的 key 作为 hash 值来确定 partition,所以我们可以保证所有的消息被发送到同样的 partition 中,并且是有序的。

(0..<10).each { i ->

  // i.e. defining different keys for odd and even messages
  def key = i % 2

  // a key is specified, all messages with same key will be sent to the same partition
  def record = KafkaProducerRecord.create("test", java.lang.String.valueOf(key), "message_${i}")

  producer.write(record)
}
Note
可共享的 Producer 通过 createShared 方法创建。它可以在多个 Verticle 实例之间共享,所以相关的配置必须在创建 Producer 的时候定义。

共享 Producer

有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。

您可以通过 KafkaProducer.createShared 方法来创建可以在 Verticle 之间安全共享的 KafkaProducer 实例:

// Create a shared producer identified by 'the-producer'
def producer1 = KafkaProducer.createShared(vertx, "the-producer", config)

// Sometimes later you can close it
producer1.close()

返回的 KafkaProducer 实例将复用相关的资源(如线程、连接等)。

使用完 KafkaProducer 后,直接调用 close 方法关闭即可,相关的资源会自动释放。

关闭 Producer

与关闭 Consumer 类似,关闭 Producer 只需要调用 close 方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。

由于 close 方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler)来监听关闭完成的消息。

当关闭操作彻底完成以后,注册的 Handler 将会被调用。

producer.close({ res ->
  if (res.succeeded()) {
    println("Producer is now closed")
  } else {
    println("close failed")
  }
})

获取 Topic Partition 的相关信息

您可以通过 partitionsFor 方法获取指定 topic 的分区信息。

// asking partitions information about specific topic
producer.partitionsFor("test", { ar ->

  if (ar.succeeded()) {

    ar.result().each { partitionInfo ->
      println(partitionInfo)
    }
  }
})

错误处理

您可以利用 exceptionHandler 方法和 exceptionHandler 方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。 比如:

// setting handler for errors
consumer.exceptionHandler({ e ->
  println("Error = ${e.getMessage()}")
})

随 Verticle 自动关闭

如果您是在 Verticle 内部创建的 Consumer 和 Producer,那么当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。

使用 Vert.x 自带的序列化与反序列化机制

Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 BufferJsonObjectJsonArray 等类型。

KafkaConsumer 里您可以使用 Buffer

// Creating a consumer able to deserialize to buffers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// Creating a consumer able to deserialize to json object
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

// Creating a consumer able to deserialize to json array
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"

同样在 KafkaProducer 中也可以:

// Creating a producer able to serialize to buffers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["acks"] = "1"

// Creating a producer able to serialize to json object
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["acks"] = "1"

// Creating a producer able to serialize to json array
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["acks"] = "1"

RxJava2 API

Vert.x Kafka Client 组件也提供Rx风格的API。

Code not translatable

Vert.x Kafka AdminUtils

This component provides a vert.x wrapper around the most important functions of Kafka’s AdminUtils. AdminUtils are used to create, modify, and delete topics. Other functionality covered by AdminUtils, but not this wrapper, includes Partition Management, Broker Configuration management, etc.

Using the AdminUtils

Create a topic

You can call createTopic to create a topic. Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result. It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.

def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.createTopic("myNewTopic", 2, 1, { result ->
  if (result.succeeded()) {
    println("Creation of topic myNewTopic successful!")} else {
    println("Creation of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})

Delete a topic

You can call deleteTopic to delete a topic. Parameters are: topic name, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Delete topic 'myNewTopic'
adminUtils.deleteTopic("myNewTopic", { result ->
  if (result.succeeded()) {
    println("Deletion of topic myNewTopic successful!")} else {
    println("Deletion of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})

Change a topic’s configuration

If you need to update the configuration of a topic, e.g., you want to update the retention policy, you can call changeTopicConfig to update a topic. Parameters are: topic name, a Map (String → String) with parameters to be changed, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Set retention to 1000 ms and max size of the topic partition to 1 kiByte
def properties = [:]
properties["delete.retention.ms"] = "1000"
properties["retention.bytes"] = "1024"
adminUtils.changeTopicConfig("myNewTopic", properties, { result ->
  if (result.succeeded()) {
    println("Configuration change of topic myNewTopic successful!")} else {
    println("Configuration change of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})
}

Check if a topic exists

If you want to check if a topic exists, you can call topicExists. Parameters are: topic name, and the usual callback to handle the result. It might return an error, e.g. if the topic does not exist.

def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
adminUtils.topicExists("myNewTopic", { result ->
  if (result.succeeded()) {
    println("Topic myNewTopic exists: ${result.result()}")
  } else {
    println("Failed to check if topic myNewTopic exists: ${result.cause().getLocalizedMessage()}")}
})