要使用 Vert.x 熔断器,只需要在依赖中增加以下代码片段:
Maven (在 pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-circuit-breaker</artifactId>
<version>3.6.2</version>
</dependency>
Gradle (在 build.gradle
文件中):
compile 'io.vertx:vertx-circuit-breaker:3.6.2'
为了使用熔断器我们需要以下的步骤:
创建一个熔断器,并配置成你所需要的(超时,最大故障次数)
使用熔断器执行代码
以下是例子:
require 'vertx-circuit-breaker/circuit_breaker'
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx, {
'maxFailures' => 5,
'timeout' => 2000,
'fallbackOnFailure' => true,
'resetTimeout' => 10000
})
# ---
# Store the circuit breaker in a field and access it as follows
# ---
breaker.execute() { |future|
# some code executing with the breaker
# the code reports failures or success on the given future.
# if this future is marked as failed, the breaker increased the
# number of failures
}.set_handler() { |ar_err,ar|
# Get the operation result.
}
执行块中接收 Future
作为参数,以表示操作和结果的成功或失败。 例如在下面的例子中,对应的结果就是REST调用的输出:
require 'vertx-circuit-breaker/circuit_breaker'
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx, {
'maxFailures' => 5,
'timeout' => 2000
})
# ---
# Store the circuit breaker in a field and access it as follows
# ---
breaker.execute() { |future|
vertx.create_http_client().get_now(8080, "localhost", "/") { |response|
if (response.status_code() != 200)
future.fail("HTTP error")
else
response.exception_handler(&future.method(:fail)).body_handler() { |buffer|
future.complete(buffer.to_string())
}
end
}
}.set_handler() { |ar_err,ar|
# Do something with the result
}
操作的结果以下面的方式提供:
也可以提供一个失败时回调方法(fallback):
require 'vertx-circuit-breaker/circuit_breaker'
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx, {
'maxFailures' => 5,
'timeout' => 2000
})
# ---
# Store the circuit breaker in a field and access it as follows
# ---
breaker.execute_with_fallback(lambda { |future|
vertx.create_http_client().get_now(8080, "localhost", "/") { |response|
if (response.status_code() != 200)
future.fail("HTTP error")
else
response.exception_handler(&future.method(:fail)).body_handler() { |buffer|
future.complete(buffer.to_string())
}
end
}
}, lambda { |v|
# Executed when the circuit is opened
return "Hello"
}).set_handler() { |ar_err,ar|
# Do something with the result
}
熔断状态中都会调用失败回调(fallback),或者设置
isFallbackOnFailure
,其结果是失败回调函数的输出。失败回调函数将
Throwable
对象作为参数,并返回预期类型的对象。
失败回调可以直接设置在 CircuitBreaker
上:
require 'vertx-circuit-breaker/circuit_breaker'
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx, {
'maxFailures' => 5,
'timeout' => 2000
}).fallback(lambda { |v|
# Executed when the circuit is opened.
return "hello"
})
breaker.execute() { |future|
vertx.create_http_client().get_now(8080, "localhost", "/") { |response|
if (response.status_code() != 200)
future.fail("HTTP error")
else
response.exception_handler(&future.method(:fail)).body_handler() { |buffer|
future.complete(buffer.to_string())
}
end
}
}
可以指定熔断器在生效之前的尝试次数,使用
maxRetries
.
。如果将其设置为高于0的值,则您的代码在最终失败之前进行尝试多次执行。如果代码在其中一个重试中成功,则处理程序将得到通知,并且跳过剩余的重试。此配置仅当熔断器未生效时工作。
Notice that is you set maxRetries
to 2 for instance, your operation may be called 3 times: the initial attempt
and 2 retries.
你能够配置熔断生效/关闭时回调。
require 'vertx-circuit-breaker/circuit_breaker'
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx, {
'maxFailures' => 5,
'timeout' => 2000
}).open_handler() { |v|
puts "Circuit opened"
}.close_handler() { |v|
puts "Circuit closed"
}
breaker.execute() { |future|
vertx.create_http_client().get_now(8080, "localhost", "/") { |response|
if (response.status_code() != 200)
future.fail("HTTP error")
else
# Do something with the response
future.complete()
end
}
}
当熔断器决定尝试复位的时候( half-open 状态),我们也可以注册
halfOpenHandler
的回调从而得到回调通知。
每次熔断器状态发生变化时,会在Event Bus上发布事件。事件发送的地址可以使用
notificationAddress
进行配置。如果将 null
传递给此方法,则通知将被禁用。默认情况下,使用的地址是 vertx.circuit-breaker
。
每次事件信息包含以下:
state
: 熔断器的新状态 ( OPEN
, CLOSED
, HALF_OPEN
)
name
: 熔断器的名字
failures
: 故障的数量
node
: 节点的标志符(如果运行在单节点模式是 local
)
当熔断器在熔断状态中,对其调用会立即失败,不会执行实际操作。经过适当的时间(
resetTimeout
设置),熔断器决定是否恢复状态,此时进入半开启状态(half-open state)。
在这种状态下,允许下一次熔断器的调用实际调用如果成功,熔断器将复位并返回到关闭状态,回归正常的模式;但是如果这次调用失败,则熔断器返回到熔断状态,直到下次半开状态。
The fallback receives a:
OpenCircuitException
when the circuit breaker is opened
TimeoutException
when the operation timed out
Netflix Hystrix
带有一个仪表板(dashboard),用于显示熔断器的当前状态。 Vert.x 熔断器可以发布其指标(metric),以供Hystrix 仪表板使用。 Hystrix 仪表板需要一个发送指标的SSE流,此流由
HystrixMetricHandler
这个 Vert.x Web Handler 提供:
require 'vertx-circuit-breaker/circuit_breaker'
require 'vertx-web/router'
require 'vertx-circuit-breaker/hystrix_metric_handler'
# Create the circuit breaker as usual.
breaker = VertxCircuitBreaker::CircuitBreaker.create("my-circuit-breaker", vertx)
breaker2 = VertxCircuitBreaker::CircuitBreaker.create("my-second-circuit-breaker", vertx)
# Create a Vert.x Web router
router = VertxWeb::Router.router(vertx)
# Register the metric handler
router.get("/hystrix-metrics").handler(&VertxCircuitBreaker::HystrixMetricHandler.create(vertx).method(:handle))
# Create the HTTP server using the router to dispatch the requests
vertx.create_http_server().request_handler(&router.method(:accept)).listen(8080)
在Hystrix 仪表板中,配置流网址(stream url),如: http://localhost:8080/metrics
。仪表板将使用Vert.x熔断器的指标。
请注意,这些指标量是由 Vert.x Web Handler 使用 Event Bus 事件通知收集的。如果您不使用默认通知地址,则需要在创建时指定。