KafkaConsumer

Vert.x Kafka consumer.

You receive Kafka records by providing a @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::handler. As messages arrive the handler will be called with the records.

The @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::pause and @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::resume provides global control over reading the records from the consumer.

The @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::pause and @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

package

Default

Methods

__construct

__construct() 

Manually assign a partition to this consumer.

assign( $arg0,  $arg1 = null) : $this

param $topicPartition [TopicPartition | array] partition which want assigned assign($topicPartition)

Manually assign a list of partition to this consumer.

param $topicPartitions [array] partitions which want assigned assign($topicPartitions)

Manually assign a partition to this consumer.

Due to internal buffering of messages, when reassigning the old partition may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new partition. param $topicPartition [TopicPartition | array] partition which want assigned param $completionHandler [callable] handler called on operation completed assign($topicPartition, $completionHandler) Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new set of partitions. param $topicPartitions [array] partitions which want assigned param $completionHandler [callable] handler called on operation completed assign($topicPartitions, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

callable

Response

$this

current KafkaConsumer instance

Get the set of partitions currently assigned to this consumer.

assignment( $arg0) : $this

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer #handler(Handler) record handler.

batchHandler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance

Get the first offset for the given partitions.

beginningOffsets( $arg0,  $arg1) : void

Arguments

$arg0

array | TopicPartition

$arg1

callable

Close the consumer

close( $arg0 = null) : void

close()

Close the consumer

param $completionHandler [callable] handler called on operation completed close($completionHandler)

Arguments

$arg0

callable

Commit current offsets for all the subscribed list of topics and partition.

commit( $arg0 = null) : void

commit()

Commit current offsets for all the subscribed list of topics and partition.

param $completionHandler [callable] handler called on operation completed commit($completionHandler)

Arguments

$arg0

callable

Get the last committed offset for the given partition (whether the commit happened by this process or another).

committed( $arg0,  $arg1) : void

Arguments

$arg0

array | TopicPartition

$arg1

callable

Create a new KafkaConsumer instance

create( $arg0,  $arg1,  $arg2 = null,  $arg3 = null) : \io\vertx\jphp\kafka\client\consumer\KafkaConsumer<K,V>
static

param $vertx [Vertx] Vert.x instance to use param $config [array] Kafka consumer configuration create($vertx, $config)

Create a new KafkaConsumer instance

param $vertx [Vertx] Vert.x instance to use param $config [array] Kafka consumer configuration param $keyType [string] class type for the key deserialization param $valueType [string] class type for the value deserialization create($vertx, $config, $keyType, $valueType)

Arguments

$arg0

Vertx

$arg1

array

$arg2

string

$arg3

string

Response

\io\vertx\jphp\kafka\client\consumer\KafkaConsumer

an instance of the KafkaConsumer

endHandler

endHandler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

endOffsets( $arg0,  $arg1) : void

Arguments

$arg0

array | TopicPartition

$arg1

callable

exceptionHandler

exceptionHandler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

Fetch the specified <code>amount</code> of elements. If the <code>ReadStream</code> has been paused, reading will recommence with the specified <code>amount</code> of items, otherwise the specified <code>amount</code> will be added to the current stream demand.

fetch( $arg0) : $this

Arguments

$arg0

integer

Response

$this

a reference to this, so the API can be used fluently

handler

handler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

offsetsForTimes( $arg0,  $arg1,  $arg2) : void

Arguments

$arg0

array | TopicPartition

$arg1

integer

$arg2

callable

Set the handler called when topic partitions are assigned to the consumer

partitionsAssignedHandler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance

Get metadata about the partitions for a given topic.

partitionsFor( $arg0,  $arg1) : $this

Arguments

$arg0

string

$arg1

callable

Response

$this

current KafkaConsumer instance

Set the handler called when topic partitions are revoked to the consumer

partitionsRevokedHandler( $arg0) : $this

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance

<b> pause() </b>

pause( $arg0 = null,  $arg1 = null) : $this

Suspend fetching from the requested partition.

param $topicPartition [TopicPartition | array] topic partition from which suspend fetching pause($topicPartition)

Suspend fetching from the requested partitions.

param $topicPartitions [array] topic partition from which suspend fetching pause($topicPartitions)

Suspend fetching from the requested partition.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartition until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will not see messages from the given topicPartition. param $topicPartition [TopicPartition | array] topic partition from which suspend fetching param $completionHandler [callable] handler called on operation completed pause($topicPartition, $completionHandler) Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartitions until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will not see messages from the given topicPartitions. param $topicPartitions [array] topic partition from which suspend fetching param $completionHandler [callable] handler called on operation completed pause($topicPartitions, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

callable

Response

$this

Get the set of partitions that were previously paused by a call to pause(Set).

paused( $arg0) : void

Arguments

$arg0

callable

Executes a poll for getting messages from Kafka

poll( $arg0,  $arg1) : void

Arguments

$arg0

integer

$arg1

callable

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000.

pollTimeout( $arg0) : \io\vertx\jphp\kafka\client\consumer\KafkaConsumer<K,V>

Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.

Arguments

$arg0

integer

Response

\io\vertx\jphp\kafka\client\consumer\KafkaConsumer

Get the offset of the next record that will be fetched (if a record with that offset exists).

position( $arg0,  $arg1) : void

Arguments

$arg0

array | TopicPartition

$arg1

callable

<b> resume() </b>

resume( $arg0 = null,  $arg1 = null) : $this

Resume specified partition which have been paused with pause.

param $topicPartition [TopicPartition | array] topic partition from which resume fetching resume($topicPartition)

Resume specified partitions which have been paused with pause.

param $topicPartitions [array] topic partition from which resume fetching resume($topicPartitions)

Resume specified partition which have been paused with pause.

param $topicPartition [TopicPartition | array] topic partition from which resume fetching param $completionHandler [callable] handler called on operation completed resume($topicPartition, $completionHandler)

Resume specified partitions which have been paused with pause.

param $topicPartitions [array] topic partition from which resume fetching param $completionHandler [callable] handler called on operation completed resume($topicPartitions, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

callable

Response

$this

Overrides the fetch offsets that the consumer will use on the next poll.

seek( $arg0,  $arg1,  $arg2 = null) : $this

param $topicPartition [TopicPartition | array] topic partition for which seek param $offset [integer] offset to seek inside the topic partition seek($topicPartition, $offset)

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new offset. param $topicPartition [TopicPartition | array] topic partition for which seek param $offset [integer] offset to seek inside the topic partition param $completionHandler [callable] handler called on operation completed seek($topicPartition, $offset, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

integer

$arg2

callable

Response

$this

current KafkaConsumer instance

Seek to the first offset for each of the given partition.

seekToBeginning( $arg0,  $arg1 = null) : $this

param $topicPartition [TopicPartition | array] topic partition for which seek seekToBeginning($topicPartition)

Seek to the first offset for each of the given partitions.

param $topicPartitions [array] topic partition for which seek seekToBeginning($topicPartitions)

Seek to the first offset for each of the given partition.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new offset. param $topicPartition [TopicPartition | array] topic partition for which seek param $completionHandler [callable] handler called on operation completed seekToBeginning($topicPartition, $completionHandler) Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new offset. param $topicPartitions [array] topic partition for which seek param $completionHandler [callable] handler called on operation completed seekToBeginning($topicPartitions, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

callable

Response

$this

current KafkaConsumer instance

Seek to the last offset for each of the given partition.

seekToEnd( $arg0,  $arg1 = null) : $this

param $topicPartition [TopicPartition | array] topic partition for which seek seekToEnd($topicPartition)

Seek to the last offset for each of the given partitions.

param $topicPartitions [array] topic partition for which seek seekToEnd($topicPartitions)

Seek to the last offset for each of the given partition.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new offset. param $topicPartition [TopicPartition | array] topic partition for which seek param $completionHandler [callable] handler called on operation completed seekToEnd($topicPartition, $completionHandler) Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new offset. param $topicPartitions [array] topic partition for which seek param $completionHandler [callable] handler called on operation completed seekToEnd($topicPartitions, $completionHandler)

Arguments

$arg0

array | TopicPartition

$arg1

callable

Response

$this

current KafkaConsumer instance

Subscribe to the given topic to get dynamically assigned partitions.

subscribe( $arg0,  $arg1 = null) : $this

param $topic [string] topic to subscribe to subscribe($topic)

Subscribe to the given list of topics to get dynamically assigned partitions.

param $topics [array] topics to subscribe to subscribe($topics)

Subscribe to the given topic to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topic the old topic may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new topic. param $topic [string] topic to subscribe to param $completionHandler [callable] handler called on operation completed subscribe($topic, $completionHandler) Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the @see \io\vertx\jphp\kafka\client\consumer\KafkaConsumer::batchHandler will only see messages consistent with the new set of topics. param $topics [array] topics to subscribe to param $completionHandler [callable] handler called on operation completed subscribe($topics, $completionHandler)

Arguments

$arg0

string | array

$arg1

callable

Response

$this

current KafkaConsumer instance

Get the current subscription.

subscription( $arg0) : $this

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance

Unsubscribe from topics currently subscribed with subscribe.

unsubscribe( $arg0 = null) : $this

unsubscribe()

Unsubscribe from topics currently subscribed with subscribe.

param $completionHandler [callable] handler called on operation completed unsubscribe($completionHandler)

Arguments

$arg0

callable

Response

$this

current KafkaConsumer instance