此组件提供了 Kafka Client 的集成,可以以 Vert.x 的方式从 Apache Kafka 集群上消费或者发送消息。
对于消费者(consumer),API以异步的方式订阅消费指定的 topic 以及相关的分区(partition), 或者将消息以 Vert.x Stream 的方式读取(甚至可以支持暂停(pause)和恢复(resume)操作)。
对于生产者(producer),API提供发送信息到指定 topic 以及相关的分区(partition)的方法,类似于向 Vert.x Stream 中写入数据。
Warning
|
此组件处于技术预览阶段,因此在之后版本中API可能还会发生一些变更。 |
要使用 Vert.x Kafka Client 组件,需要添加以下依赖:
Maven (在 pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
<version>3.6.2</version>
</dependency>
Gradle (在 build.gradle
文件中):
compile io.vertx:vertx-kafka-client:3.6.2
创建 Consumer 和 Producer 以及使用它们的方法其实与原生的 Kafka Client 库非常相似,Vert.x 只是做了一层异步封装。
我们可以通过一个 Map 来包装这些配置,然后将其传入到
KafkaConsumer
接口或
KafkaProducer
接口中的 create
静态方法里来创建 KafkaConsumer
或 KafkaProducer
:
// creating the consumer using map config
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["value.deserializer"] = "org.apache.kafka.common.serialization.StringDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"
// use consumer for interacting with Apache Kafka
def consumer = KafkaConsumer.create(vertx, config)
在上面的例子中,我们在创建 KafkaConsumer
实例时传入了一个 Map 实例,用于指定要连接的 Kafka 节点列表(只有一个)以及如何对接收到的消息进行解析以得到 key 与 value。
我们可以用类似的方法来创建 Producer:
// creating the producer using map and class types for key and value serializers/deserializers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
config["acks"] = "1"
// use producer for interacting with Apache Kafka
def producer = KafkaProducer.create(vertx, config)
我们可以通过 KafkaConsumer
的
subscribe
方法来订阅一个或多个 topic 进行消费,同时加入到某个消费组(consumer group)中(在创建消费者实例时通过配置指定)。
当然你需要通过
handler
方法注册一个 Handler
来处理接收的消息:
// register the handler for incoming messages
consumer.handler({ record ->
println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// subscribe to several topics
def topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics)
// or just subscribe to a single topic
consumer.subscribe("a-single-topic")
The handler can be registered before or after the call to subscribe()
; messages won’t be consumed until both
methods have been called. This allows you to call subscribe()
, then seek()
and finally handler()
in
order to only consume messages starting from a particular offset, for example.
另外如果想知道消息是否成功被消费掉,可以在调用 subscribe
方法时绑定一个 Handler
:
// register the handler for incoming messages
consumer.handler({ record ->
println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// subscribe to several topics
def topics = new java.util.HashSet()
topics.add("topic1")
topics.add("topic2")
topics.add("topic3")
consumer.subscribe(topics, { ar ->
if (ar.succeeded()) {
println("subscribed")
} else {
println("Could not subscribe ${ar.cause().getMessage()}")
}
})
// or just subscribe to a single topic
consumer.subscribe("a-single-topic", { ar ->
if (ar.succeeded()) {
println("subscribed")
} else {
println("Could not subscribe ${ar.cause().getMessage()}")
}
})
由于Kafka的消费者会组成一个消费组(consumer group),同一个组只有一个消费者可以消费特定的 partition, 同时此消费组也可以接纳其他的消费者,这样可以实现 partition 分配给组内其它消费者继续去消费。
如果组内的一个消费者挂了,kafka 集群会自动把 partition 重新分配给组内其他消费者,或者新加入一个消费者去消费对应的 partition。
您可以通过 partitionsRevokedHandler
和
partitionsAssignedHandler
方法在
KafkaConsumer
里注册一个 Handler
用于监听对应的 partition 是否被删除或者分配。
// register the handler for incoming messages
consumer.handler({ record ->
println("Processing key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
// registering handlers for assigned and revoked partitions
consumer.partitionsAssignedHandler({ topicPartitions ->
println("Partitions assigned")
topicPartitions.each { topicPartition ->
println("${topicPartition.topic} ${topicPartition.partition}")
}
})
consumer.partitionsRevokedHandler({ topicPartitions ->
println("Partitions revoked")
topicPartitions.each { topicPartition ->
println("${topicPartition.topic} ${topicPartition.partition}")
}
})
// subscribes to the topic
consumer.subscribe("test", { ar ->
if (ar.succeeded()) {
println("Consumer subscribed")
}
})
加入某个 consumer group 的消费者,可以通过
unsubscribe
方法退出该消费组,从而不再接受到相关消息:
// consumer is already member of a consumer group
// unsubscribing request
consumer.unsubscribe()
当然你也可以在 unsubscribe
方法中传入一个 Handler
用于监听执行结果状态:
// consumer is already member of a consumer group
// unsubscribing request
consumer.unsubscribe({ ar ->
if (ar.succeeded()) {
println("Consumer unsubscribed")
}
})
消费组内的消费者可以消费某个 topic 指定的 partition。如果某个消费者并不属于任何消费组,那么整个程序就不能依赖 Kafka 的 re-balancing 机制去消费消息。
您可以通过 assign
方法请求分配指定的分区:
// register the handler for incoming messages
consumer.handler({ record ->
println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
})
//
def topicPartitions = new java.util.HashSet()
topicPartitions.add([
topic:"test",
partition:0
])
// requesting to be assigned the specific partition
consumer.assign(topicPartitions, { done ->
if (done.succeeded()) {
println("Partition assigned")
// requesting the assigned partitions
consumer.assignment({ done1 ->
if (done1.succeeded()) {
done1.result().each { topicPartition ->
println("${topicPartition.topic} ${topicPartition.partition}")
}
}
})
}
})
As with subscribe()
, the handler can be registered before or after the call to assign()
;
messages won’t be consumed until both methods have been called. This allows you to call
assign()
, then seek()
and finally handler()
in
order to only consume messages starting from a particular offset, for example.
上面的 assignment
方法可以列出当前分配的 topic partition。
Other than using the internal polling mechanism in order to receive messages from Kafka, the client can subscribe to a
topic, avoiding to register the handler for getting the messages and then using the poll
method.
In this way, the user application is in charge to execute the poll for getting messages when it needs, for example after processing the previous ones.
// subscribes to the topic
consumer.subscribe("test", { ar ->
if (ar.succeeded()) {
println("Consumer subscribed")
vertx.setPeriodic(1000, { timerId ->
consumer.poll(100, { ar1 ->
if (ar1.succeeded()) {
def records = ar1.result()
(0..<records.size()).each { i ->
def record = records.recordAt(i)
println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
}
}
})
})
}
})
After subscribing successfully, the application start a periodic timer in order to execute the poll and getting messages from Kafka periodically.
You can change the subscribed topics, or assigned partitions after you have started to consume messages, simply
by calling subscribe()
or assign()
again.
Note that due to internal buffering of messages it is possible that the record handler will continue to
observe messages from the old subscription or assignment after the subscribe()
or assign()
method’s completion handler has been called. This is not the case for messages observed by the batch handler:
Once the completion handler has been called it will only observe messages read from the subscription or assignment.
您可以通过 partitionsFor
方法获取指定 topic 的 partition 信息:
// asking partitions information about specific topic
consumer.partitionsFor("test", { ar ->
if (ar.succeeded()) {
ar.result().each { partitionInfo ->
println(partitionInfo)
}
}
})
另外, listTopics
方法可以列出消费者下的所有 topic 以及对应的 partition 信息:
// asking information about available topics and related partitions
consumer.listTopics({ ar ->
if (ar.succeeded()) {
def map = ar.result()
map.each { topic, partitions ->
println("topic = ${topic}")
println("partitions = ${map[topic]}")
}
}
})
在 Apache Kafka 中,消费者负责处理最新读取消息的偏移量(offset)。
Consumer 会在每次从某个 topic partition 中读取一批消息的时候自动执行提交偏移量的操作。需要在创建 KafkaConsumer
时将 enable.auto.commit
配置项设为 true
来开启自动提交。
我们可以通过 commit
方法进行手动提交。手动提交偏移量通常用于确保消息分发的 at least once 语义,以确保消息没有被消费前不会执行提交。
// consumer is processing read messages
// committing offset of the last read message
consumer.commit({ ar ->
if (ar.succeeded()) {
println("Last read message offset committed")
}
})
Apache Kafka 中的消息是按顺序持久化在磁盘上的,所以消费者可以在某个 partition 内部进行偏移量定位(seek)操作, 并从任意指定的 topic 以及 partition 位置开始消费消息。
我们可以通过 seek
方法来更改读取位置对应的偏移量:
def topicPartition = [
topic:"test",
partition:0
]
// seek to a specific offset
consumer.seek(topicPartition, 10, { done ->
if (done.succeeded()) {
println("Seeking done")
}
})
当消费者需要从 Stream 的起始位置读取消息时,可以使用 seekToBeginning
方法将 offset
位置设置到 partition 的起始端:
def topicPartition = [
topic:"test",
partition:0
]
// seek to the beginning of the partition
consumer.seekToBeginning(java.util.Collections.singleton(topicPartition), { done ->
if (done.succeeded()) {
println("Seeking done")
}
})
最后我们也可以通过 seekToEnd
方法将 offset
位置设置到 partition 的末端:
def topicPartition = [
topic:"test",
partition:0
]
// seek to the end of the partition
consumer.seekToEnd(java.util.Collections.singleton(topicPartition), { done ->
if (done.succeeded()) {
println("Seeking done")
}
})
Note that due to internal buffering of messages it is possible that the record handler will continue to
observe messages read from the original offset for a time after the seek*()
method’s completion
handler has been called. This is not the case for messages observed by the batch handler: Once the
seek*()
completion handler has been called it will only observe messages read from the new offset.
你可以利用 Kafka 0.10.1.1 引入的新的API beginningOffsets
来获取给定分区的起始偏移量。这个跟上面的
seekToBeginning
方法有一个地方不同:
beginningOffsets
方法不会更改 offset 的值,仅仅是读取(只读模式)。
def topicPartitions = new java.util.HashSet()
def topicPartition = [
topic:"test",
partition:0
]
topicPartitions.add(topicPartition)
consumer.beginningOffsets(topicPartitions, { done ->
if (done.succeeded()) {
def results = done.result()
results.each { topic, beginningOffset ->
println("Beginning offset for topic=${topic.topic}, partition=${topic.partition}, beginningOffset=${beginningOffset}")
}
}
})
// Convenience method for single-partition lookup
consumer.beginningOffsets(topicPartition, { done ->
if (done.succeeded()) {
def beginningOffset = done.result()
println("Beginning offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, beginningOffset=${beginningOffset}")
}
})
与此对应的API还有 endOffsets
方法,用于获取给定分区末端的偏移量值。与 seekToEnd
方法相比,
endOffsets
方法不会更改 offset 的值,仅仅是读取(只读模式)。
def topicPartitions = new java.util.HashSet()
def topicPartition = [
topic:"test",
partition:0
]
topicPartitions.add(topicPartition)
consumer.endOffsets(topicPartitions, { done ->
if (done.succeeded()) {
def results = done.result()
results.each { topic, endOffset ->
println("End offset for topic=${topic.topic}, partition=${topic.partition}, endOffset=${endOffset}")
}
}
})
// Convenience method for single-partition lookup
consumer.endOffsets(topicPartition, { done ->
if (done.succeeded()) {
def endOffset = done.result()
println("End offset for topic=${topicPartition.topic}, partition=${topicPartition.partition}, endOffset=${endOffset}")
}
})
Kafka 0.10.1.1 还提供了一个根据时间戳(timestamp)来定位 offset 的API方法 offsetsForTimes
,调用此API可以返回大于等于给定时间戳的 offset。因为 Kafka 的 offset 低位就是时间戳,所以 Kafka 很容易定位此类offset。
Code not translatable
Consumer 可以对消息流进行流量控制。如果我们读到一批消息,需要花点时间进行处理则可以暂时暂停(pause
)消息的流入(这里实际上是把消息全部缓存到内存里了);
等我们处理了差不多了,可以再继续消费缓存起来的消息(resume
)。
In the case of the partition-specific pause and resume it is possible that the record handler will continue to
observe messages from a paused partition for a time after the pause()
method’s completion
handler has been called. This is not the case for messages observed by the batch handler: Once the
pause()
completion handler has been called it will only observe messages from those partitions which
rare not paused.
def topicPartition = [
topic:"test",
partition:0
]
// registering the handler for incoming messages
consumer.handler({ record ->
println("key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
// i.e. pause/resume on partition 0, after reading message up to offset 5
if ((record.partition() == 0) && (record.offset() == 5)) {
// pause the read operations
consumer.pause(topicPartition, { ar ->
if (ar.succeeded()) {
println("Paused")
// resume read operation after a specific time
vertx.setTimer(5000, { timeId ->
// resume read operations
consumer.resume(topicPartition)
})
}
})
}
})
关闭 Consumer 只需要调用 close
方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放相关资源。
由于 close
方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler
)来监听关闭完成的消息。
当关闭操作彻底完成以后,注册的 Handler
将会被调用。
consumer.close({ res ->
if (res.succeeded()) {
println("Consumer is now closed")
} else {
println("close failed")
}
})
您可以利用 write
方法来向某个 topic 发送消息(records)。
最简单的发送消息的方式是仅仅指定目的 topic 以及相应的值而省略消息的 key 以及分区。在这种情况下, 消息会以轮询(round robin)的方式发送到对应 topic 的所有分区上。
(0..<5).each { i ->
// only topic and message value are specified, round robin on destination partitions
def record = KafkaProducerRecord.create("test", "message_${i}")
producer.write(record)
}
您可以通过绑定 Handler
来接受发送的结果。这个结果其实就是一些元数据(metadata),
包含消息的 topic、目的分区 (destination partition) 以及分配的偏移量 (assigned offset)。
(0..<5).each { i ->
// only topic and message value are specified, round robin on destination partitions
def record = KafkaProducerRecord.create("test", "message_${i}")
producer.write(record, { done ->
if (done.succeeded()) {
def recordMetadata = done.result()
println("Message ${record.value()} written on topic=${recordMetadata.topic}, partition=${recordMetadata.partition}, offset=${recordMetadata.offset}")
}
})
}
如果希望将消息发送到指定的分区,你可以指定分区的标识(identifier)或者设定消息的 key:
(0..<10).each { i ->
// a destination partition is specified
def record = KafkaProducerRecord.create("test", null, "message_${i}", 0)
producer.write(record)
}
因为 Producer 可以使用消息的 key 作为 hash 值来确定 partition,所以我们可以保证所有的消息被发送到同样的 partition 中,并且是有序的。
(0..<10).each { i ->
// i.e. defining different keys for odd and even messages
def key = i % 2
// a key is specified, all messages with same key will be sent to the same partition
def record = KafkaProducerRecord.create("test", java.lang.String.valueOf(key), "message_${i}")
producer.write(record)
}
Note
|
可共享的 Producer 通过 createShared 方法创建。它可以在多个 Verticle 实例之间共享,所以相关的配置必须在创建 Producer 的时候定义。
|
有时候您希望在多个 Verticle 或者 Vert.x Context 下共用一个 Producer。
您可以通过 KafkaProducer.createShared
方法来创建可以在 Verticle 之间安全共享的 KafkaProducer
实例:
// Create a shared producer identified by 'the-producer'
def producer1 = KafkaProducer.createShared(vertx, "the-producer", config)
// Sometimes later you can close it
producer1.close()
返回的 KafkaProducer
实例将复用相关的资源(如线程、连接等)。
使用完 KafkaProducer
后,直接调用 close
方法关闭即可,相关的资源会自动释放。
与关闭 Consumer 类似,关闭 Producer 只需要调用 close
方法就可以了,它会自动的关闭与 Kafka 的连接,同时释放所有相关资源。
由于 close
方法是异步的,你并不知道关闭操作什么时候完成或失败,这时你需要注册一个处理器(Handler
)来监听关闭完成的消息。
当关闭操作彻底完成以后,注册的 Handler
将会被调用。
producer.close({ res ->
if (res.succeeded()) {
println("Producer is now closed")
} else {
println("close failed")
}
})
您可以通过 partitionsFor
方法获取指定 topic 的分区信息。
// asking partitions information about specific topic
producer.partitionsFor("test", { ar ->
if (ar.succeeded()) {
ar.result().each { partitionInfo ->
println(partitionInfo)
}
}
})
您可以利用
exceptionHandler
方法和
exceptionHandler
方法来处理 Kafka 客户端(生产者和消费者)和 Kafka 集群之间的错误(如超时)。
比如:
// setting handler for errors
consumer.exceptionHandler({ e ->
println("Error = ${e.getMessage()}")
})
如果您是在 Verticle 内部创建的 Consumer 和 Producer,那么当对应 Verticle 被卸载(undeploy)的时候,相关的 Consumer 和 Producer 会自动关闭。
Vert.x Kafka Client 自带现成的序列化与反序列化机制,可以处理 Buffer
、JsonObject
和 JsonArray
等类型。
在 KafkaConsumer
里您可以使用 Buffer
:
// Creating a consumer able to deserialize to buffers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.BufferDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"
// Creating a consumer able to deserialize to json object
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonObjectDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"
// Creating a consumer able to deserialize to json array
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["value.deserializer"] = "io.vertx.kafka.client.serialization.JsonArrayDeserializer"
config["group.id"] = "my_group"
config["auto.offset.reset"] = "earliest"
config["enable.auto.commit"] = "false"
同样在 KafkaProducer
中也可以:
// Creating a producer able to serialize to buffers
def config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.BufferSerializer"
config["acks"] = "1"
// Creating a producer able to serialize to json object
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonObjectSerializer"
config["acks"] = "1"
// Creating a producer able to serialize to json array
config = [:]
config["bootstrap.servers"] = "localhost:9092"
config["key.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["value.serializer"] = "io.vertx.kafka.client.serialization.JsonArraySerializer"
config["acks"] = "1"
Vert.x Kafka Client 组件也提供Rx风格的API。
Code not translatable
You can call createTopic
to create a topic.
Parameters are: topic name, number of partitions, number of replicas, and the usual callback to handle the result.
It might return an error, e.g. if the number of requested replicas is greater than the number of brokers.
def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Create topic 'myNewTopic' with 2 partition and 1 replicas
adminUtils.createTopic("myNewTopic", 2, 1, { result ->
if (result.succeeded()) {
println("Creation of topic myNewTopic successful!")} else {
println("Creation of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})
You can call deleteTopic
to delete a topic.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Delete topic 'myNewTopic'
adminUtils.deleteTopic("myNewTopic", { result ->
if (result.succeeded()) {
println("Deletion of topic myNewTopic successful!")} else {
println("Deletion of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})
If you need to update the configuration of a topic, e.g., you want to update the retention policy,
you can call changeTopicConfig
to update a topic.
Parameters are: topic name, a Map (String → String) with parameters to be changed,
and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
// Set retention to 1000 ms and max size of the topic partition to 1 kiByte
def properties = [:]
properties["delete.retention.ms"] = "1000"
properties["retention.bytes"] = "1024"
adminUtils.changeTopicConfig("myNewTopic", properties, { result ->
if (result.succeeded()) {
println("Configuration change of topic myNewTopic successful!")} else {
println("Configuration change of topic myNewTopic failed: ${result.cause().getLocalizedMessage()}")}
})
}
If you want to check if a topic exists, you can call topicExists
.
Parameters are: topic name, and the usual callback to handle the result.
It might return an error, e.g. if the topic does not exist.
def adminUtils = AdminUtils.create(Vertx.vertx(), "localhost:2181", true)
adminUtils.topicExists("myNewTopic", { result ->
if (result.succeeded()) {
println("Topic myNewTopic exists: ${result.result()}")
} else {
println("Failed to check if topic myNewTopic exists: ${result.cause().getLocalizedMessage()}")}
})