Class: VertxKafkaClient::KafkaProducer

Inherits:
Object
  • Object
show all
Includes:
Vertx::WriteStream
Defined in:
/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb

Overview

Vert.x Kafka producer.

The provides global control over writing a record.

Class Method Summary (collapse)

Instance Method Summary (collapse)

Class Method Details

+ (::VertxKafkaClient::KafkaProducer) create(vertx = nil, config = nil, keyType = nil, valueType = nil)

Create a new KafkaProducer instance

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • config (Hash{String => String}) (defaults to: nil)
    Kafka producer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key serialization
  • valueType (Nil) (defaults to: nil)
    class type for the value serialization

Returns:

Raises:

  • (ArgumentError)


45
46
47
48
49
50
51
52
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 45

def self.create(vertx=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:create, [Java::IoVertxCore::Vertx.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create(#{vertx},#{config},#{keyType},#{valueType})"
end

+ (::VertxKafkaClient::KafkaProducer) create_shared(vertx = nil, name = nil, config = nil, keyType = nil, valueType = nil)

Get or create a KafkaProducer instance which shares its stream with any other KafkaProducer created with the same name

Parameters:

  • vertx (::Vertx::Vertx) (defaults to: nil)
    Vert.x instance to use
  • name (String) (defaults to: nil)
    the producer name to identify it
  • config (Hash{String => String}) (defaults to: nil)
    Kafka producer configuration
  • keyType (Nil) (defaults to: nil)
    class type for the key serialization
  • valueType (Nil) (defaults to: nil)
    class type for the value serialization

Returns:

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 31

def self.create_shared(vertx=nil,name=nil,config=nil,keyType=nil,valueType=nil)
  if vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && !block_given? && keyType == nil && valueType == nil
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }]),::VertxKafkaClient::KafkaProducer, nil, nil)
  elsif vertx.class.method_defined?(:j_del) && name.class == String && config.class == Hash && keyType.class == Class && valueType.class == Class && !block_given?
    return ::Vertx::Util::Utils.safe_create(Java::IoVertxKafkaClientProducer::KafkaProducer.java_method(:createShared, [Java::IoVertxCore::Vertx.java_class,Java::java.lang.String.java_class,Java::JavaUtil::Map.java_class,Java::JavaLang::Class.java_class,Java::JavaLang::Class.java_class]).call(vertx.j_del,name,Hash[config.map { |k,v| [k,v] }],::Vertx::Util::Utils.j_class_of(keyType),::Vertx::Util::Utils.j_class_of(valueType)),::VertxKafkaClient::KafkaProducer, ::Vertx::Util::Utils.v_type_of(keyType), ::Vertx::Util::Utils.v_type_of(valueType))
  end
  raise ArgumentError, "Invalid arguments when calling create_shared(#{vertx},#{name},#{config},#{keyType},#{valueType})"
end

Instance Method Details

- (void) close(timeout = nil) { ... }

This method returns an undefined value.

Close the producer

Parameters:

  • timeout (Fixnum) (defaults to: nil)
    timeout to wait for closing

Yields:

  • handler called on operation completed

Raises:

  • (ArgumentError)


136
137
138
139
140
141
142
143
144
145
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 136

def close(timeout=nil)
  if !block_given? && timeout == nil
    return @j_del.java_method(:close, []).call()
  elsif block_given? && timeout == nil
    return @j_del.java_method(:close, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  elsif timeout.class == Fixnum && block_given?
    return @j_del.java_method(:close, [Java::long.java_class,Java::IoVertxCore::Handler.java_class]).call(timeout,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil) }))
  end
  raise ArgumentError, "Invalid arguments when calling close(#{timeout})"
end

- (self) drain_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


104
105
106
107
108
109
110
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 104

def drain_handler
  if block_given?
    @j_del.java_method(:drainHandler, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling drain_handler()"
end

- (void) end(kafkaProducerRecord = nil)

This method returns an undefined value.

Parameters:

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 78

def end(kafkaProducerRecord=nil)
  if !block_given? && kafkaProducerRecord == nil
    return @j_del.java_method(:end, []).call()
  elsif kafkaProducerRecord.class.method_defined?(:j_del) && !block_given?
    return @j_del.java_method(:end, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(kafkaProducerRecord.j_del)
  end
  raise ArgumentError, "Invalid arguments when calling end(#{kafkaProducerRecord})"
end

- (self) exception_handler { ... }

Yields:

Returns:

  • (self)

Raises:

  • (ArgumentError)


55
56
57
58
59
60
61
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 55

def exception_handler
  if block_given?
    @j_del.java_method(:exceptionHandler, [Java::IoVertxCore::Handler.java_class]).call((Proc.new { |event| yield(::Vertx::Util::Utils.from_throwable(event)) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling exception_handler()"
end

- (self) flush { ... }

Invoking this method makes all buffered records immediately available to write

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 125

def flush
  if block_given?
    @j_del.java_method(:flush, [Java::IoVertxCore::Handler.java_class]).call(Proc.new { yield })
    return self
  end
  raise ArgumentError, "Invalid arguments when calling flush()"
end

- (self) partitions_for(topic = nil) { ... }

Get the partition metadata for the give topic.

Parameters:

  • topic (String) (defaults to: nil)
    topic partition for which getting partitions info

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


115
116
117
118
119
120
121
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 115

def partitions_for(topic=nil)
  if topic.class == String && block_given?
    @j_del.java_method(:partitionsFor, [Java::java.lang.String.java_class,Java::IoVertxCore::Handler.java_class]).call(topic,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result.to_a.map { |elt| elt != nil ? JSON.parse(elt.toJson.encode) : nil } : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling partitions_for(#{topic})"
end

- (self) set_write_queue_max_size(i = nil)

Parameters:

  • i (Fixnum) (defaults to: nil)

Returns:

  • (self)

Raises:

  • (ArgumentError)


88
89
90
91
92
93
94
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 88

def set_write_queue_max_size(i=nil)
  if i.class == Fixnum && !block_given?
    @j_del.java_method(:setWriteQueueMaxSize, [Java::int.java_class]).call(i)
    return self
  end
  raise ArgumentError, "Invalid arguments when calling set_write_queue_max_size(#{i})"
end

- (self) write(record = nil) { ... }

Asynchronously write a record to a topic

Parameters:

Yields:

  • handler called on operation completed

Returns:

  • (self)

Raises:

  • (ArgumentError)


66
67
68
69
70
71
72
73
74
75
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 66

def write(record=nil)
  if record.class.method_defined?(:j_del) && !block_given?
    @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class]).call(record.j_del)
    return self
  elsif record.class.method_defined?(:j_del) && block_given?
    @j_del.java_method(:write, [Java::IoVertxKafkaClientProducer::KafkaProducerRecord.java_class,Java::IoVertxCore::Handler.java_class]).call(record.j_del,(Proc.new { |ar| yield(ar.failed ? ar.cause : nil, ar.succeeded ? ar.result != nil ? JSON.parse(ar.result.toJson.encode) : nil : nil) }))
    return self
  end
  raise ArgumentError, "Invalid arguments when calling write(#{record})"
end

- (true, false) write_queue_full?

Returns:

  • (true, false)

Raises:

  • (ArgumentError)


96
97
98
99
100
101
# File '/Users/julien/java/vertx-aggregator/modules/vertx-lang-ruby/vertx-lang-ruby/target/classes/vertx-kafka-client/kafka_producer.rb', line 96

def write_queue_full?
  if !block_given?
    return @j_del.java_method(:writeQueueFull, []).call()
  end
  raise ArgumentError, "Invalid arguments when calling write_queue_full?()"
end