Class: VertxKafkaClient::KafkaConsumer

Inherits:
Object
  • Object
show all
Includes:
Vertx::ReadStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb

Overview

Vert.x Kafka consumer.

You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.

The #pause and #resume provides global control over reading the records from the consumer.

The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaConsumer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka consumer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key deserialization
  • valueType (Nil) (defaults to: nil)
    class type for the value deserialization

Returns:

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 49

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }

Manually assign a list of partition to this consumer.

Due to internal buffering of messages, when reassigning the old set of partitions may remain in effect (as observed by the record handler)} until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of partitions.

Overloads:

  • - (self) assign(topicPartition)

    Parameters:

    • topicPartition (Hash)
      partition which want assigned
  • - (self) assign(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned
  • - (self) assign(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      partition which want assigned

    Yields:

    • handler called on operation completed
  • - (self) assign(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      partitions which want assigned

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 212

def assign(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assign(#{param_1})"
end

- (self) assignment { ... }

Get the set of partitions currently assigned to this consumer.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


231
232
233
234
235
236
237
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 231

def assignment
  if block_given?
    @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling assignment()"
end

- (self) batch_handler { ... }

Set the handler to be used when batches of messages are fetched from the Kafka server. Batch handlers need to take care not to block the event loop when dealing with large batches. It is better to process records individually using the record handler.

Yields:

  • handler called when batches of messages are fetched

Returns:

  • (self)

Raises:

  • (ArgumentError)


423
424
425
426
427
428
429
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 423

def batch_handler
  if block_given?
    @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling batch_handler()"
end

- (void) beginning_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the first offset for the given partitions.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the earliest offset.

Yields:

  • handler called on operation completed. Returns the earliest available offset for the given partition

Raises:

  • (ArgumentError)


467
468
469
470
471
472
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 467

def beginning_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})"
end

- (void) close { ... }

This method returns an undefined value.

Close the consumer

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


433
434
435
436
437
438
439
440
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 433

def close
  if !block_given?
    return @j_del.java_method(:close, []).call()
  elsif block_given?
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close()"
end

- (void) commit { ... }

This method returns an undefined value.

Commit current offsets for all the subscribed list of topics and partition.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


388
389
390
391
392
393
394
395
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 388

def commit
  if !block_given?
    return @j_del.java_method(:commit, []).call()
  elsif block_given?
    return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling commit()"
end

- (void) committed(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last committed offset for the given partition (whether the commit happened by this process or another).

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for getting last committed offset

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


400
401
402
403
404
405
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 400

def committed(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})"
end

- (self) end_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


149
150
151
152
153
154
155
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 149

def end_handler
  if block_given?
    @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling end_handler()"
end

- (void) end_offsets(topicPartition = nil) { ... }

This method returns an undefined value.

Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    the partition to get the end offset.

Yields:

  • handler called on operation completed. The end offset for the given partition.

Raises:

  • (ArgumentError)


478
479
480
481
482
483
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 478

def end_offsets(topicPartition=nil)
  if topicPartition.class == Hash && block_given?
    return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


59
60
61
62
63
64
65
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 59

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) fetch(amount = nil)

Fetch the specified amount of elements. If the ReadStream has been paused, reading will recommence with the specified amount of items, otherwise the specified amount will be added to the current stream demand.

Parameters:

  • amount (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)


36
37
38
39
40
41
42
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 36

def fetch(amount=nil)
  if amount.class == Fixnum && !block_given?
    @j_del.java_method(:fetch, [Java::long.java_class]).call(amount)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling fetch(#{amount})"
end

- (self) handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 68

def handler
  if block_given?
    @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling handler()"
end

- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }

This method returns an undefined value.

Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    TopicPartition to query.
  • timestamp (Fixnum) (defaults to: nil)
    Timestamp to be used in the query.

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


457
458
459
460
461
462
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 457

def offsets_for_times(topicPartition=nil,timestamp=nil)
  if topicPartition.class == Hash && timestamp.class == Fixnum && block_given?
    return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),timestamp,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{timestamp})"
end

- (self) partitions_assigned_handler { ... }

Set the handler called when topic partitions are assigned to the consumer

Yields:

  • handler called on assigned topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


283
284
285
286
287
288
289
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 283

def partitions_assigned_handler
  if block_given?
    @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()"
end

- (self) partitions_for(topic = nil) { ... }

Get metadata about the partitions for a given topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


410
411
412
413
414
415
416
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 410

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) partitions_revoked_handler { ... }

Set the handler called when topic partitions are revoked to the consumer

Yields:

  • handler called on revoked topic partitions

Returns:

  • (self)

Raises:

  • (ArgumentError)


273
274
275
276
277
278
279
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 273

def partitions_revoked_handler
  if block_given?
    @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()"
end

- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }

Suspend fetching from the requested partitions.

Due to internal buffering of messages, the will continue to observe messages from the given topicPartitions until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will not see messages from the given topicPartitions.

Overloads:

  • - (self) pause(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching
  • - (self) pause(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching
  • - (self) pause(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed
  • - (self) pause(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which suspend fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 96

def pause(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:pause, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling pause(#{param_1})"
end

- (void) paused { ... }

This method returns an undefined value.

Get the set of partitions that were previously paused by a call to pause(Set).

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


264
265
266
267
268
269
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 264

def paused
  if block_given?
    return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling paused()"
end

- (void) poll(timeout = nil) { ... }

This method returns an undefined value.

Executes a poll for getting messages from Kafka

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Yields:

  • handler called after the poll with batch of records (can be empty).

Raises:

  • (ArgumentError)


500
501
502
503
504
505
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 500

def poll(timeout=nil)
  if timeout.class == Fixnum && block_given?
    return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling poll(#{timeout})"
end

- (::VertxKafkaClient::KafkaConsumer) poll_timeout(timeout = nil)

Sets the poll timeout (in ms) for the underlying native Kafka Consumer. Defaults to 1000. Setting timeout to a lower value results in a more 'responsive' client, because it will block for a shorter period if no data is available in the assigned partition and therefore allows subsequent actions to be executed with a shorter delay. At the same time, the client will poll more frequently and thus will potentially create a higher load on the Kafka Broker.

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the native Kafka consumer's buffer, else returns empty. Must not be negative.

Returns:

Raises:

  • (ArgumentError)


490
491
492
493
494
495
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 490

def poll_timeout(timeout=nil)
  if timeout.class == Fixnum && !block_given?
    return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout),::VertxKafkaClient::KafkaConsumer, nil, nil)
  end
  raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})"
end

- (void) position(partition = nil) { ... }

This method returns an undefined value.

Get the offset of the next record that will be fetched (if a record with that offset exists).

Parameters:

  • partition (Hash) (defaults to: nil)
    The partition to get the position for

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


445
446
447
448
449
450
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 445

def position(partition=nil)
  if partition.class == Hash && block_given?
    return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling position(#{partition})"
end

- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }

Resume specified partitions which have been paused with pause.

Overloads:

  • - (self) resume(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching
  • - (self) resume(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching
  • - (self) resume(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed
  • - (self) resume(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition from which resume fetching

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 128

def resume(param_1=nil)
  if !block_given? && param_1 == nil
    @j_del.java_method(:resume, []).call()
    return self
  elsif param_1.class == Hash && !block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling resume(#{param_1})"
end

- (self) seek(topicPartition = nil, offset = nil) { ... }

Overrides the fetch offsets that the consumer will use on the next poll.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Parameters:

  • topicPartition (Hash) (defaults to: nil)
    topic partition for which seek
  • offset (Fixnum) (defaults to: nil)
    offset to seek inside the topic partition

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


303
304
305
306
307
308
309
310
311
312
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 303

def seek(topicPartition=nil,offset=nil)
  if topicPartition.class == Hash && offset.class == Fixnum && !block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset)
    return self
  elsif topicPartition.class == Hash && offset.class == Fixnum && block_given?
    @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})"
end

- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

Seek to the first offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToBeginning(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToBeginning(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToBeginning(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 333

def seek_to_beginning(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})"
end

- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }

Seek to the last offset for each of the given partitions.

Due to internal buffering of messages, the will continue to observe messages fetched with respect to the old offset until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new offset.

Overloads:

  • - (self) seekToEnd(topicPartition)

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek
  • - (self) seekToEnd(topicPartitions)

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek
  • - (self) seekToEnd(topicPartition, completionHandler) { ... }

    Parameters:

    • topicPartition (Hash)
      topic partition for which seek

    Yields:

    • handler called on operation completed
  • - (self) seekToEnd(topicPartitions, completionHandler) { ... }

    Parameters:

    • topicPartitions (Set<Hash>)
      topic partition for which seek

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 369

def seek_to_end(param_1=nil)
  if param_1.class == Hash && !block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)))
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }))
    return self
  elsif param_1.class == Hash && block_given?
    @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})"
end

- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }

Subscribe to the given list of topics to get dynamically assigned partitions.

Due to internal buffering of messages, when changing the subscribed topics the old set of topics may remain in effect (as observed by the record handler}) until some time after the given completionHandler is called. In contrast, the once the given completionHandler is called the #batch_handler will only see messages consistent with the new set of topics.

Overloads:

  • - (self) subscribe(topic)

    Parameters:

    • topic (String)
      topic to subscribe to
  • - (self) subscribe(topics)

    Parameters:

    • topics (Set<String>)
      topics to subscribe to
  • - (self) subscribe(topic, completionHandler) { ... }

    Parameters:

    • topic (String)
      topic to subscribe to

    Yields:

    • handler called on operation completed
  • - (self) subscribe(topics, completionHandler) { ... }

    Parameters:

    • topics (Set<String>)
      topics to subscribe to

    Yields:

    • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 176

def subscribe(param_1=nil)
  if param_1.class == String && !block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1)
    return self
  elsif param_1.class == Set && !block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }))
    return self
  elsif param_1.class == String && block_given?
    @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  elsif param_1.class == Set && block_given?
    @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})"
end

- (self) subscription { ... }

Get the current subscription.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


254
255
256
257
258
259
260
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 254

def subscription
  if block_given?
    @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling subscription()"
end

- (self) unsubscribe { ... }

Unsubscribe from topics currently subscribed with subscribe.

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


241
242
243
244
245
246
247
248
249
250
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 241

def unsubscribe
  if !block_given?
    @j_del.java_method(:unsubscribe, []).call()
    return self
  elsif block_given?
    @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling unsubscribe()"
end