Class: VertxKafkaClient::KafkaProducer
- Inherits:
-
Object
- Object
- VertxKafkaClient::KafkaProducer
- Includes:
- Vertx::WriteStream
- Defined in:
- /Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb
Overview
Vert.x Kafka producer.
The provides global control over writing a record.
Class Method Summary (collapse)
-
+ (::VertxKafkaClient::KafkaProducer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaProducer instance.
-
+ (::VertxKafkaClient::KafkaProducer) create_shared(vertx = nil, name = nil, config = nil, keyType = nil, valueType = nil)
Get or create a KafkaProducer instance which shares its stream with any other KafkaProducer created with the same name.
Instance Method Summary (collapse)
-
- (void) close(timeout = nil) { ... }
Close the producer.
- - (self) drain_handler { ... }
- - (void) end(kafkaProducerRecord = nil)
- - (self) exception_handler { ... }
-
- (self) flush { ... }
Invoking this method makes all buffered records immediately available to write.
-
- (self) partitions_for(topic = nil) { ... }
Get the partition metadata for the give topic.
- - (self) set_write_queue_max_size(i = nil)
-
- (self) write(record = nil) { ... }
Asynchronously write a record to a topic.
- - (true, false) write_queue_full?
Class Method Details
+ (::VertxKafkaClient::KafkaProducer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)
Create a new KafkaProducer instance
45 46 47 48 49 50 51 52 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 45 def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil) elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})" end |
+ (::VertxKafkaClient::KafkaProducer) create_shared(vertx = nil, name = nil, config = nil, keyType = nil, valueType = nil)
Get or create a KafkaProducer instance which shares its stream with any other KafkaProducer created with the same
name
31 32 33 34 35 36 37 38 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 31 def self.create_shared(vertx=nil,name=nil,config=nil,keyType=nil,valueType=nil) if vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && !block_given? && keyType == nil && valueType == nil return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil) elsif vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given? return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType)) end raise ArgumentError, "Invalid arguments when calling create_shared(#{vertx},#{name},#{config},#{keyType},#{valueType})" end |
Instance Method Details
- (void) close(timeout = nil) { ... }
This method returns an undefined value.
Close the producer
136 137 138 139 140 141 142 143 144 145 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 136 def close(timeout=nil) if !block_given? && timeout == nil return @j_del.java_method(:close, []).call() elsif block_given? && timeout == nil return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) elsif timeout.class == Fixnum && block_given? return @j_del.java_method(:close, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) })) end raise ArgumentError, "Invalid arguments when calling close(#{timeout})" end |
- (self) drain_handler { ... }
104 105 106 107 108 109 110 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 104 def drain_handler if block_given? @j_del.java_method(:drainHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling drain_handler()" end |
- (void) end(kafkaProducerRecord = nil)
This method returns an undefined value.
78 79 80 81 82 83 84 85 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 78 def end(kafkaProducerRecord=nil) if !block_given? && kafkaProducerRecord == nil return @j_del.java_method(:end, []).call() elsif kafkaProducerRecord.class.method_defined?(:j_del) && !block_given? return @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(kafkaProducerRecord.j_del) end raise ArgumentError, "Invalid arguments when calling end(#{kafkaProducerRecord})" end |
- (self) exception_handler { ... }
55 56 57 58 59 60 61 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 55 def exception_handler if block_given? @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) })) return self end raise ArgumentError, "Invalid arguments when calling exception_handler()" end |
- (self) flush { ... }
Invoking this method makes all buffered records immediately available to write
125 126 127 128 129 130 131 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 125 def flush if block_given? @j_del.java_method(:flush, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield }) return self end raise ArgumentError, "Invalid arguments when calling flush()" end |
- (self) partitions_for(topic = nil) { ... }
Get the partition metadata for the give topic.
115 116 117 118 119 120 121 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 115 def partitions_for(topic=nil) if topic.class == String && block_given? @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) })) return self end raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})" end |
- (self) set_write_queue_max_size(i = nil)
88 89 90 91 92 93 94 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 88 def set_write_queue_max_size(i=nil) if i.class == Fixnum && !block_given? @j_del.java_method(:setWriteQueueMaxSize, [Java::int.java_class]).call(i) return self end raise ArgumentError, "Invalid arguments when calling set_write_queue_max_size(#{i})" end |
- (self) write(record = nil) { ... }
Asynchronously write a record to a topic
66 67 68 69 70 71 72 73 74 75 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 66 def write(record=nil) if record.class.method_defined?(:j_del) && !block_given? @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(record.j_del) return self elsif record.class.method_defined?(:j_del) && block_given? @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(record.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) })) return self end raise ArgumentError, "Invalid arguments when calling write(#{record})" end |
- (true, false) write_queue_full?
96 97 98 99 100 101 |
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 96 def write_queue_full? if !block_given? return @j_del.java_method(:writeQueueFull, []).call() end raise ArgumentError, "Invalid arguments when calling write_queue_full?()" end |