Class: VertxKafkaClient::KafkaConsumer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaConsumer
- Includes:
- Vertx::ReadStream
- Defined in:
- /Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb
Overview
You receive Kafka records by providing a #handler. As messages arrive the handler will be called with the records.
The #pause and #resume provides global control over reading the records from the consumer.
The #pause and #resume provides finer grained control over reading records for specific Topic/Partition, these are Kafka's specific operations.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaConsumer instance.
Instance Method Summary (collapse)
-
- (self) assign(param_1 = nil)
Manually assign a list of partition to this consumer.
-
- (self) assignment { ... }
Get the set of partitions currently assigned to this consumer.
-
- (self) batch_handler { ... }
Set the handler to be used when batches of messages are fetched from the Kafka server.
-
- (void) beginning_offsets(topicPartition = nil) { ... }
Get the first offset for the given partitions.
-
- (void) close { ... }
Close the consumer.
-
- (void) commit { ... }
Commit current offsets for all the subscribed list of topics and partition.
-
- (void) committed(topicPartition = nil) { ... }
Get the last committed offset for the given partition (whether the commit happened by this process or another).
- - (self) end_handler { ... }
-
- (void) end_offsets(topicPartition = nil) { ... }
Get the last offset for the given partition.
- - (self) exception_handler { ... }
-
- (self) fetch(amount = nil)
Fetch the specified amount of elements.
- - (self) handler { ... }
-
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
Look up the offset for the given partition by timestamp.
-
- (self) partitions_assigned_handler { ... }
Set the handler called when topic partitions are assigned to the consumer.
-
- (self) partitions_for(topic = nil) { ... }
Get metadata about the partitions for a given topic.
-
- (self) partitions_revoked_handler { ... }
Set the handler called when topic partitions are revoked to the consumer.
-
- (self) pause(param_1 = nil)
Suspend fetching from the requested partitions.
-
- (void) paused { ... }
Get the set of partitions that were previously paused by a call to pause(Set).
-
- (void) poll(timeout = nil) { ... }
Executes a poll for getting messages from Kafka.
-
- (::VertxKafkaClient::KafkaConsumer) poll_timeout(timeout = nil)
Sets the poll timeout (in ms) for the underlying native Kafka Consumer.
-
- (void) position(partition = nil) { ... }
Get the offset of the next record that will be fetched (if a record with that offset exists).
-
- (self) resume(param_1 = nil)
Resume specified partitions which have been paused with pause.
-
- (self) seek(topicPartition = nil, offset = nil) { ... }
Overrides the fetch offsets that the consumer will use on the next poll.
-
- (self) seek_to_beginning(param_1 = nil)
Seek to the first offset for each of the given partitions.
-
- (self) seek_to_end(param_1 = nil)
Seek to the last offset for each of the given partitions.
-
- (self) subscribe(param_1 = nil)
Subscribe to the given list of topics to get dynamically assigned partitions.
-
- (self) subscription { ... }
Get the current subscription.
-
- (self) unsubscribe { ... }
Unsubscribe from topics currently subscribed with subscribe.
Class Method Details
+ (::VertxKafkaClient::KafkaConsumer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
49 50 51 52 53 54 55 56 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 49 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaConsumer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientConsumer::KafkaConsumer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaConsumer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (self) assign(topicPartition) - (self) assign(topicPartitions) - (self) assign(topicPartition, completionHandler) { ... } - (self) assign(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages, when reassigning
the old set of partitions may remain in effect
(as observed by the record handler)}
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of partitions.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 212 def assign(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:assign, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:assign, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assign(#{param_1})" end |
- (self) assignment { ... }
231 232 233 234 235 236 237 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 231 def assignment if block_given? @j_del.java_method(:assignment, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling assignment()" end |
- (self) batch_handler { ... }
423 424 425 426 427 428 429 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 423 def batch_handler if block_given? @j_del.java_method(:batchHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecords, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling batch_handler()" end |
- (void) beginning_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the first offset for the given partitions.
467 468 469 470 471 472 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 467 def beginning_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:beginningOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling beginning_offsets(#{topicPartition})" end |
- (void) close { ... }
This method returns an undefined value.
Close the consumer
433 434 435 436 437 438 439 440 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 433 def close if !block_given? return @j_del.java_method(:close, []).call() elsif block_given? return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close()" end |
- (void) commit { ... }
This method returns an undefined value.
Commit current offsets for all the subscribed list of topics and partition.
388 389 390 391 392 393 394 395 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 388 def commit if !block_given? return @j_del.java_method(:commit, []).call() elsif block_given? return @j_del.java_method(:commit, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling commit()" end |
- (void) committed(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last committed offset for the given partition (whether the commit happened by this process or another).
400 401 402 403 404 405 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 400 def committed(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:committed, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling committed(#{topicPartition})" end |
- (self) end_handler { ... }
149 150 151 152 153 154 155 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 149 def end_handler if block_given? @j_del.java_method(:endHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling end_handler()" end |
- (void) end_offsets(topicPartition = nil) { ... }
This method returns an undefined value.
Get the last offset for the given partition. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
478 479 480 481 482 483 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 478 def end_offsets(topicPartition=nil) if topicPartition.class == Hash && block_given? return @j_del.java_method(:endOffsets, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling end_offsets(#{topicPartition})" end |
- (self) exception_handler { ... }
59 60 61 62 63 64 65 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 59 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) fetch(amount = nil)
amount
of elements. If the ReadStream
has been paused, reading will
recommence with the specified amount
of items, otherwise the specified amount
will
be added to the current stream demand.
36 37 38 39 40 41 42 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 36 def fetch(amount=nil) if amount.class == Fixnum && !block_given? @j_del.java_method(:fetch, [Java::long.java_class]).call(amount) return self end raise ArgumentError, "Invalid arguments when calling fetch(#{amount})" end |
- (self) handler { ... }
68 69 70 71 72 73 74 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 68 def handler if block_given? @j_del.java_method(:handler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.safe_create(event,::VertxKafkaClient::KafkaConsumerRecord, nil, nil)) })) return self end raise ArgumentError, "Invalid arguments when calling handler()" end |
- (void) offsets_for_times(topicPartition = nil, timestamp = nil) { ... }
This method returns an undefined value.
Look up the offset for the given partition by timestamp. Note: the result might be null in case for the given timestamp no offset can be found -- e.g., when the timestamp refers to the future
457 458 459 460 461 462 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 457 def offsets_for_times(topicPartition=nil,=nil) if topicPartition.class == Hash && .class == Fixnum && block_given? return @j_del.java_method(:offsetsForTimes, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::JavaLang::Long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) end raise ArgumentError, "Invalid arguments when calling offsets_for_times(#{topicPartition},#{})" end |
- (self) partitions_assigned_handler { ... }
283 284 285 286 287 288 289 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 283 def partitions_assigned_handler if block_given? @j_del.java_method(:partitionsAssignedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_assigned_handler()" end |
- (self) partitions_for(topic = nil) { ... }
410 411 412 413 414 415 416 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 410 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) partitions_revoked_handler { ... }
273 274 275 276 277 278 279 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 273 def partitions_revoked_handler if block_given? @j_del.java_method(:partitionsRevokedHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.to_set(event).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil }) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_revoked_handler()" end |
- (self) pause - (self) pause(topicPartition) - (self) pause(topicPartitions) - (self) pause(topicPartition, completionHandler) { ... } - (self) pause(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages from the given topicPartitions
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will not see messages
from the given topicPartitions
.
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 96 def pause(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:pause, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:pause, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:pause, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling pause(#{param_1})" end |
- (void) paused { ... }
This method returns an undefined value.
Get the set of partitions that were previously paused by a call to pause(Set).
264 265 266 267 268 269 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 264 def paused if block_given? return @j_del.java_method(:paused, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) end raise ArgumentError, "Invalid arguments when calling paused()" end |
- (void) poll(timeout = nil) { ... }
This method returns an undefined value.
Executes a poll for getting messages from Kafka
500 501 502 503 504 505 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 500 def poll(timeout=nil) if timeout.class == Fixnum && block_given? return @j_del.java_method(:poll, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.safe_create(ar.result,::VertxKafkaClient::KafkaConsumerRecords, nil, nil) : nil) })) end raise ArgumentError, "Invalid arguments when calling poll(#{timeout})" end |
- (::VertxKafkaClient::KafkaConsumer) poll_timeout(timeout = nil)
490 491 492 493 494 495 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 490 def poll_timeout(timeout=nil) if timeout.class == Fixnum && !block_given? return ::Vertx::Util::Utils.safe_create(@j_del.java_method(:pollTimeout, [Java::long.java_class]).call(timeout),::VertxKafkaClient::KafkaConsumer, nil, nil) end raise ArgumentError, "Invalid arguments when calling poll_timeout(#{timeout})" end |
- (void) position(partition = nil) { ... }
This method returns an undefined value.
Get the offset of the next record that will be fetched (if a record with that offset exists).
445 446 447 448 449 450 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 445 def position(partition=nil) if partition.class == Hash && block_given? return @j_del.java_method(:position, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(partition)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result : nil) })) end raise ArgumentError, "Invalid arguments when calling position(#{partition})" end |
- (self) resume - (self) resume(topicPartition) - (self) resume(topicPartitions) - (self) resume(topicPartition, completionHandler) { ... } - (self) resume(topicPartitions, completionHandler) { ... }
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 128 def resume(param_1=nil) if !block_given? && param_1 == nil @j_del.java_method(:resume, []).call() return self elsif param_1.class == Hash && !block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:resume, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:resume, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling resume(#{param_1})" end |
- (self) seek(topicPartition = nil, offset = nil) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
303 304 305 306 307 308 309 310 311 312 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 303 def seek(topicPartition=nil,offset=nil) if topicPartition.class == Hash && offset.class == Fixnum && !block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset) return self elsif topicPartition.class == Hash && offset.class == Fixnum && block_given? @j_del.java_method(:seek, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(topicPartition)),offset,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek(#{topicPartition},#{offset})" end |
- (self) seekToBeginning(topicPartition) - (self) seekToBeginning(topicPartitions) - (self) seekToBeginning(topicPartition, completionHandler) { ... } - (self) seekToBeginning(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 333 def seek_to_beginning(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToBeginning, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToBeginning, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_beginning(#{param_1})" end |
- (self) seekToEnd(topicPartition) - (self) seekToEnd(topicPartitions) - (self) seekToEnd(topicPartition, completionHandler) { ... } - (self) seekToEnd(topicPartitions, completionHandler) { ... }
Due to internal buffering of messages,
the will
continue to observe messages fetched with respect to the old offset
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new offset.
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 369 def seek_to_end(param_1=nil) if param_1.class == Hash && !block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1))) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) })) return self elsif param_1.class == Hash && block_given? @j_del.java_method(:seekToEnd, [Java::IoVertxKafkaClientCommon::TopicPartition.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(param_1)),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:seekToEnd, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| Java::IoVertxKafkaClientCommon::TopicPartition.new(::Vertx::Util::Utils.to_json_object(element)) }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling seek_to_end(#{param_1})" end |
- (self) subscribe(topic) - (self) subscribe(topics) - (self) subscribe(topic, completionHandler) { ... } - (self) subscribe(topics, completionHandler) { ... }
Due to internal buffering of messages, when changing the subscribed topics
the old set of topics may remain in effect
(as observed by the record handler})
until some time after the given completionHandler
is called. In contrast, the once the given completionHandler
is called the #batch_handler will only see messages
consistent with the new set of topics.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 176 def subscribe(param_1=nil) if param_1.class == String && !block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class]).call(param_1) return self elsif param_1.class == Set && !block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element })) return self elsif param_1.class == String && block_given? @j_del.java_method(:subscribe, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(param_1,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self elsif param_1.class == Set && block_given? @j_del.java_method(:subscribe, [Java::JavaUtil::Set.java_class,Java::IoVertxCore::Handler.java_class]).call(Java::JavaUtil::LinkedHashSet.new(param_1.map { |element| element }),(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscribe(#{param_1})" end |
- (self) subscription { ... }
254 255 256 257 258 259 260 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 254 def subscription if block_given? @j_del.java_method(:subscription, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ::Vertx::Util::Utils.to_set(ar.result).map! { |elt| elt } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling subscription()" end |
- (self) unsubscribe { ... }
241 242 243 244 245 246 247 248 249 250 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_consumer.rb', line 241 def unsubscribe if !block_given? @j_del.java_method(:unsubscribe, []).call() return self elsif block_given? @j_del.java_method(:unsubscribe, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) return self end raise ArgumentError, "Invalid arguments when calling unsubscribe()" end |