vert-x3 / vertx-circuit-breaker Goto Github PK
View Code? Open in Web Editor NEWA circuit breaker for Vert.x
License: Apache License 2.0
A circuit breaker for Vert.x
License: Apache License 2.0
Getting a deadlock in vertx event loop thread in class HystrixMetricEventStream
3.9.2
Following is the stack trace of the deadlock.
"vert.x-eventloop-thread-33":
waiting to lock monitor 0x00007fb2dc0ab5b8 (object 0x00000005decc1d00, a java.util.Collections$SynchronizedList),
which is held by "vert.x-eventloop-thread-32"
"vert.x-eventloop-thread-32":
waiting to lock monitor 0x00007fb5e40175c8 (object 0x00000006217cd9c8, a io.vertx.core.http.impl.Http1xServerConnection),
which is held by "vert.x-eventloop-thread-33"
"vert.x-eventloop-thread-33":
at java.util.Collections$SynchronizedCollection.remove(Collections.java:2038)
- waiting to lock <0x00000005decc1d00> (a java.util.Collections$SynchronizedList)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream.lambda$handle$3(HystrixMetricEventStream.java:114)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream$$Lambda$940/1263525672.handle(Unknown Source)
at io.vertx.core.net.impl.ConnectionBase.handleException(ConnectionBase.java:332)
- locked <0x00000006217cd9c8> (a io.vertx.core.http.impl.Http1xServerConnection)
at io.vertx.core.http.impl.Http1xServerConnection.handleException(Http1xServerConnection.java:423)
at io.vertx.core.net.impl.VertxHandler.lambda$exceptionCaught$3(VertxHandler.java:150)
at io.vertx.core.net.impl.VertxHandler$$Lambda$922/107884447.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
at io.vertx.core.net.impl.VertxHandler.exceptionCaught(VertxHandler.java:143)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268)
at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1389)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
`"vert.x-eventloop-thread-32":
at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:677)
- waiting to lock <0x00000006217cd9c8> (a io.vertx.core.http.impl.Http1xServerConnection)
at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:325)
at io.vertx.core.http.impl.HttpServerResponseImpl.write(HttpServerResponseImpl.java:64)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream.lambda$null$0(HystrixMetricEventStream.java:39)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream$$Lambda$654/1232465312.accept(Unknown Source)
at java.lang.Iterable.forEach(Iterable.java:75)
at java.util.Collections$SynchronizedCollection.forEach(Collections.java:2062)
- locked <0x00000005decc1d00> (a java.util.Collections$SynchronizedList)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream.lambda$new$1(HystrixMetricEventStream.java:38)
at io.vertx.circuitbreaker.impl.HystrixMetricEventStream$$Lambda$648/1196496258.handle(Unknown Source)
at io.vertx.core.eventbus.impl.HandlerRegistration.deliver(HandlerRegistration.java:278)
at io.vertx.core.eventbus.impl.HandlerRegistration.handle(HandlerRegistration.java:264)
at io.vertx.core.eventbus.impl.EventBusImpl$InboundDeliveryContext.next(EventBusImpl.java:573)
at io.vertx.core.eventbus.impl.EventBusImpl.lambda$deliverToHandler$4(EventBusImpl.java:533)
at io.vertx.core.eventbus.impl.EventBusImpl$$Lambda$653/1091785879.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.vertx.core.impl.EventLoopContext$$Lambda$385/991402324.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Retry policies can be configured using the retryPolicy
method.
it's common to use well-known retry policies like constant, linear, or exponential backoff delays for this policy.
Who should implement this feature ? are you volunteering for implementing this feature or
do you know that is able and willing implement this feature ?
Yes, I can implement this feature.
Hi Team
Help check option setTimeout, It is seem to me not work. I implemented on vertx 3.8.1. My brief code as below:
CircuitBreakerOptions circuitBreakerOptions = new CircuitBreakerOptions()
.setMaxFailures(5)
.setTimeout(100)
.setFallbackOnFailure(true)
.setResetTimeout(10_000);
circuit = CircuitBreaker
.create("handler-circuit-breaker", vertx, circuitBreakerOptions);
circuit.fallback(v -> {
// Executed when the circuit is opened
LOG.info("fallback not work for timeout"); //code not fall this place.
return "";
});
circuit.execute(promise -> {
try {
Thread.sleep(1000); //stay here long time more than config in option just 100ms
} catch (InterruptedException e) {
}
promise.complete("OK");
});
Thanks,
Huy Bui.
Currently the circuit breaker only have method that return Future and therefore the generated Rx API is not friendly, changing
<T> Future<T> execute(Handler<Future<T>> command);
to
@GenIgnore
<T> Future<T> execute(Handler<Future<T>> command);
default <T> void execute(Handler<Future<T>> command, Handler<AsyncResult<T>> handler) {
Future<T> fut = execute(command);
fut.setHandler(handler);
}
allows to have generated:
public <T> Single<T> rxExecute(Handler<Future<T>> command) {
return Single.create(new io.vertx.rx.java.SingleOnSubscribeAdapter<>(fut -> {
execute(command, fut);
}));
}
After executing command, if the result succeeds, rollingCounter resets. I think this should be done just when the state is HALF_OPEN not CLOSED. Because every succeeding result clears previous failed counts in rollingCounter configured time window.
In the constructor for HystrixMetricEventStream, when setting up the handler for the event bus consumer, the body is first built into a JsonObject but then for each connection, this JsonObject is encoded. Since this encoded string does not change, is it not better to move the encoding outside of the for each connection?
vertx.eventBus().<JsonObject>consumer(address)
.handler(message -> {
JsonObject json = build(message.body());
int id = counter.incrementAndGet();
connections.forEach(resp -> {
String chunk = json.encode() + "\n\n";
resp.write("id" + ": " + id + "\n");
resp.write("data:" + chunk);
});
});
The EventBus notifications are enabled by default, causing quite some unnecessary traffic on the event bus. We should probably have them optional
https://travis-ci.org/vert-x3/vertx-circuit-breaker/jobs/461176637
Tests in error:
HystrixMetricEventStreamTest.test » IllegalState Response is closed
Running io.vertx.circuitbreaker.impl.CircuitBreakerMetricsTest
Apr 05, 2018 10:39:30 AM io.vertx.core.impl.ContextImpl
SEVERE: Unhandled exception
java.lang.AssertionError:
Expecting:
<1000>
to be less than:
<1000>
at io.vertx.circuitbreaker.impl.CircuitBreakerMetricsTest.lambda$testEviction$6(CircuitBreakerMetricsTest.java:220)
at io.vertx.core.impl.CompositeFutureImpl.lambda$all$1(CompositeFutureImpl.java:49)
at io.vertx.core.impl.FutureImpl.tryComplete(FutureImpl.java:121)
at io.vertx.core.impl.FutureImpl.complete(FutureImpl.java:83)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl.lambda$null$3(CircuitBreakerImpl.java:220)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:333)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:39)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
4.36
This is caused by the shift operation used for the exponential delay which results in a value of 0
after 63 retries. This cannot simply be resolved by simply using delay<0=
as this results in the exponential delay restarting from initialDelay.
long delay = initialDelay * (1L << retryCount);
return random.nextLong(0, delay < 0 ? maxDelay : min(maxDelay, delay));
Presently, the retryPolicy
method specifies how often the circuit breaker should try before failing; however, in certain cases (e.g. expected failures) it is not desirable to continue retrying.
(If this feature exists, I couldn't find any mention of this in the documentation.)
Something like a retryOrCancelPolicy
would be desirable, where returning -1L
would cease the circuit breaker from retrying:
CircuitBreaker.create("my-circuit-breaker", vertx)
.retryOrCancelPolicy { retryCount: Int, cause: Throwable? ->
if (cause is MyExpectedException) {
-1L
} else {
retryCount * 1000L
}
}
I tend to use michaelbull/kotlin-retry for retrying, which has a pretty elegant way of handling retry policies.
For example:
val retryTimeouts: RetryPolicy<Throwable> = {
if (reason is SQLDataException) ContinueRetrying else StopRetrying
}
suspend fun printExchangeBetween(a: Long, b: Long) {
val customer1 = customers.nameFromId(a)
val customer2 = customers.nameFromId(b)
println("$customer1 exchanged with $customer2")
}
fun main() = runBlocking {
retry(retryTimeouts + limitAttempts(5) + constantDelay(20)) {
printExchangeBetween(1L, 2L)
}
}
I will try to realize this if I get the time; right now I'm holed up with my work laptop, which does not allow me to push externally... even posting this issue was a pain. :|
The number of retries indicates how many retry attempt need to be performed before considering the interaction as a failure. So, if maxRetries is set to 1, the circuit breaker should execute the “command” twice (the initial attempt and the retries). If set to 2, the circuit breaker will try 3 times, and so on.
It's currently not the case.
Is there an easy way to consider worker thread queue delay contributing to decide circuit breaker flow.
The value of "metricsRollingWindow" is not being copied from the other object in the CircuitBreakerOptions' copy constructor.
CircuitBreaker.create(name, vertx, null) will throw the following exception:
java.lang.NullPointerException
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics.<init>(CircuitBreakerMetrics.java:45)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl.<init>(CircuitBreakerImpl.java:68)
After checking source code, in io.vertx.circuitbreaker.impl.CircuitBreakerImpl, the root cause is the following code in constructor and I marked it:
public CircuitBreakerImpl(String name, Vertx vertx, CircuitBreakerOptions options) {
Objects.requireNonNull(name);
Objects.requireNonNull(vertx);
this.vertx = vertx;
this.name = name;
if (options == null) {
this.options = new CircuitBreakerOptions();
} else {
this.options = new CircuitBreakerOptions(options);
}
this.metrics = new CircuitBreakerMetrics(vertx, this, options); <== should be this.options
For better coding, in io.vertx.circuitbreaker.CircuitBreaker:
static CircuitBreaker create(String name, Vertx vertx, CircuitBreakerOptions options) {
return (options == null)? new CircuitBreakerImpl(name, vertx): new CircuitBreakerImpl(name, vertx, options);
}
Thanks
Jian
Version: 3.8.4
I use a CircuiBreaker to talk to a backend, so I want to probe its health whenever the breaker becomes half-open to give it an opportunity to go back to closed, because when the breaker is open that backend is marked as unhealthy and never selected to serve a request.
I set a timeout on my circuit breaker.
Whenever the half-open handler executes on the breaker and the execution completes (either success/fail/timeout), an IllegalStateException
is logged:
SEVERE: Unhandled exception
java.lang.IllegalStateException: Result is already complete: failed
at io.vertx.core.impl.FutureImpl.fail(FutureImpl.java:128)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl.lambda$null$4(CircuitBreakerImpl.java:225)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518)
at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
https://gist.github.com/Feder1co5oave/8d70b3383fd6c2011619f38e17d19d46#file-cbtest-java
Step by step:
JVM is OpenJDK 11 on Ubuntu Linux.
This is because when the breaker is in the HALF_OPEN
state, two handlers are added to the resultFuture in executeAndReportWithFallback
, and both take care of updating the breaker's state and metrics. Only the second handler should be set when the breaker is HALF_OPEN.
Additionally, I think a runOnContext
should be added to the handler in the HALF_OPEN case, to ensure that the result handler is run on the same context as the caller, similarly to what was done in 25ac02c.
In Vert.x 4, executeCommandXXX
methods are renamed executeXXX
for futurization purpose. In the 3.x branch we deprecate these methods and we add the new methods as deprecation replacements.
// Now this is deprecated
breaker.executeCommand(promise -> ..., res -> ...);
// Instead you should use
breaker.execute(promise -> ..., res -> ...);
Hi, guys.
Is it possible to implement resilience4j?
thx
Hello,
i noticed something and don't know if this is the intended behavior, but the metrics published by the HystrixMetricHandler
when using the circuit breaker isn't consistent with hystrix, for example the request count sent is a global one, and in the hystrix is the count for the rolling window, so we can't measure the request per second, since the vertx will always be increasing.
In the example below, i'm showing the same thing executed by hystrix and vertx-circuit-breaker, 4 requests per second, all on the same verticle, same jvm, same everything, and on hystrix we can see it accurately showing 4.0 req/s, but the vertx shows 79.1 req/s, and the success count is different too:
If this is not the intended behavior, i can try to write a PR to make it more consistent.
By the way, the test i did is avaliable here: Example
You can get a running hystrix-dashboard easy using standalone-hystrix-dashboard
Please Implement OSGi Manifest headers, so this can be directly deployed to OSGi container and do not need to be included to bundle.
Looking at the source code and available configuration options it looks like retry is being executed immediately right now, but it's usually recommended to retry with exponential backoff (with optional jitter) as to not overload failing host even more.
I could implement this feature, but would require some guidance on how to implement it correctly with VertX.
Getting vertx thread blocked in background CircuitBreakerMetrics operation. Potential deadlock issue.
4.1.8
We have a blue green deployment of Vertx microservices. In one of the microservices' blue environment which is not serving any application traffic at the time of incident, we observed CPU utilization spike up to 100% due to thread blocked issues. As resolving the incident and restoring health of the service was priority, our Incident Management team immediately replaced the affected node and did not perform a thread dump.
The following is the stack trace of the error:
Thread Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 733703 ms, time limit is 2000 ms io.vertx.core.VertxException: Thread blocked
at org.HdrHistogram.AbstractHistogram.getBucketsNeededToCoverValue(AbstractHistogram.java:2212)
at org.HdrHistogram.AbstractHistogram.determineArrayLengthNeeded(AbstractHistogram.java:374)
at org.HdrHistogram.AbstractHistogram.establishSize(AbstractHistogram.java:361)
at org.HdrHistogram.AbstractHistogram.init(AbstractHistogram.java:333)
at org.HdrHistogram.AbstractHistogram.<init>(AbstractHistogram.java:273)
at org.HdrHistogram.Histogram.<init>(Histogram.java:228)
at org.HdrHistogram.Histogram.<init>(Histogram.java:206)
at org.HdrHistogram.Histogram.<init>(Histogram.java:170)
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics$RollingWindow$Summary.<init>(CircuitBreakerMetrics.java:311)
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics$RollingWindow$Summary.<init>(CircuitBreakerMetrics.java:293)
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics$RollingWindow.windowSummary(CircuitBreakerMetrics.java:245)
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics$RollingWindow.totalSummary(CircuitBreakerMetrics.java:239)
at io.vertx.circuitbreaker.impl.CircuitBreakerMetrics.toJson(CircuitBreakerMetrics.java:128)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl.sendUpdateOnEventBus(CircuitBreakerImpl.java:160)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl.lambda$new$2(CircuitBreakerImpl.java:83)
at io.vertx.circuitbreaker.impl.CircuitBreakerImpl$$Lambda$672/0x00000001006e2440.handle(Unknown Source)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:948)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:919)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:53)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:942)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at [email protected]/java.lang.Thread.run(Unknown Source)
.. this leads to the scenario that one failure can trip the circuit, even if no failures has occured for days and .setMaxFailures is set to a large number.
I assume this isn't a desired scenario for most use cases? Hystrix solves this by keeping the failure count in an rolling window.
We've found that exceptions raised due to open circuit state end up overloading CPUs. In cases where application is handling a huge amount of errors and a circuit opens, this plain "RuntimeException" is used to fail the operation future: https://github.com/vert-x3/vertx-circuit-breaker/blob/master/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java#L304
We've discovered that this high CPU usage could be avoided if the exception does not have the "writableStackTrace" flag as true, like "NoStackTraceThrowable" does. Also, it might be convenient to use a custom exception for this scenario, probably a "RuntimeException" subclass to keep backward compatibility.
In our setup, the CircuitBreakerMetrics
is leaking Operation
objects.
They are collected in the window array.
This array is only cleared when the toJson operation is called.
As far as we can see the this method is only called when the notification address is set and when the state of the circuit breaker changes.
In our situation this is not the case and it causes this array until it claims gigabytes of memory.
To fix this, we see some options:
CircuitBreakerMetrics
Can you confirm this is a bug, do you have a preference for a solution?
3.9.2
A heap dump of our application showed that there are suspiciously many dublicated Strings related to the vert.x circuit breaker (see screenshot)
This seems to be related to the fact that the different names of the different metrics are not constant at compile time but rather dynamically generated at every toJson()
call of CircuitBreakerMetrics
. This call is performed periodically in CircuitBreakerImpl
.. This probably leads to the duplicated Strings in the heap, as every call produces another String Object with the same content, but with a different memory location.
This can probably be fixed by turning the names of the different metrices into compile time constants or calling String#intern()
at each creation.
No
Used Java 8
We suggest enhancing the API so that the criteria for "failure" is more flexible than the success/failure of the Promise
, perhaps similar to Failsafe: https://failsafe.dev/policies/#failure-handling
When using the Circuit Breaker for either EventBus
request-reply, or a Service Proxy, the reply may be an "application level" error that the CircuitBreaker should not consider a failure, and return to the caller to handle. Indeed ServiceException
provides a failureCode
presumably for that purpose (that is how we use it).
For example, a Service Proxy service might return an application-level "not found" error by replying with a ServiceException
. In this case, the CircuitBreaker
should not consider this a failure. We probably only want the CircuitBreaker
to consider a ServiceException
to be a failure when failureType
is either ReplyFailure.TIMEOUT
or ReplyFailure.NO_HANDLERS
. For example, perhaps using a Predicate
to indicate when the operation has failed:
Predicate<AsyncResult<?>> isFailed = ar -> {
if (ar.failed() && ar.cause() instanceof ServiceException) {
ReplyFailure rf = ((ServiceException)ar.cause()).failureType();
return rf == ReplyFailure.TIMEOUT || rf == ReplyFailure.NO_HANDLERS;
}
return false;
};
CircuitBreakerOptions options = new CircuitBreakerOptions().setFailurePredicate(isFailed);
CircuitBreaker breaker = CircuitBreaker.create("test", vertx, options);
// Predicate from options:
breaker.execute( promise -> myServiceProxy.myOperation("arg1", promise)).onComplete(ar -> {// .... });
// or specify failure predicate on each call
breaker.execute( promise -> myServiceProxy.myOperation("arg1", promise), isFailed).onComplete(ar -> { //.... });
If the failure predicate is not specified, the default is simply ar -> ar.failed()
, which is equivalent to the current API.
The main value in this is that it makes using the CircuitBreaker
with Service Proxy services, and EventBus
request-reply easier. That is significant since the EventBus
is a best-effort delivery system and thus often requires retries to handle system failures and even routine system downtime for maintenance.
We may be able to volunteer to provide this feature at some point in the future, but currently do not have the resources. We have already handled this situation in our applications, but the solution is not very clean.
Two recent failures
com.jayway.awaitility.core.ConditionTimeoutException: Condition with lambda expression in io.vertx.circuitbreaker.impl.CircuitBreakerImplTest was not fulfilled within 10 seconds.
at com.jayway.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:122)
at com.jayway.awaitility.core.CallableCondition.await(CallableCondition.java:79)
at com.jayway.awaitility.core.CallableCondition.await(CallableCondition.java:27)
at com.jayway.awaitility.core.ConditionFactory.until(ConditionFactory.java:764)
at com.jayway.awaitility.core.ConditionFactory.until(ConditionFactory.java:741)
at io.vertx.circuitbreaker.impl.CircuitBreakerImplTest.testResetAttemptThatFailsOnTimeout(CircuitBreakerImplTest.java:505)
com.jayway.awaitility.core.ConditionTimeoutException: com.jayway.awaitility.core.ConditionFactory.untilAtomic Callable expected <false> but was <true> within 10 seconds.
at com.jayway.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:122)
at com.jayway.awaitility.core.AbstractHamcrestCondition.await(AbstractHamcrestCondition.java:87)
at com.jayway.awaitility.core.ConditionFactory.until(ConditionFactory.java:764)
at com.jayway.awaitility.core.ConditionFactory.untilAtomic(ConditionFactory.java:664)
at io.vertx.circuitbreaker.impl.CircuitBreakerImplTest.testResetAttemptThatFailsOnTimeout(CircuitBreakerImplTest.java:537)
Version
3.9.5
Context
I use circuit-breaker with option retry & timeout (retry 3 times, timeout 2000ms), deploy service on k8s with CPU < 1000m,
Sometimes circuit-breaker timeout before the function executed, throw exception TimeoutException.
Seem like this function work too much on context, it makes operation not execute yet, then timer timeout trigger
// Execute the operation
if (options.getTimeout() != -1) {
vertx.setTimer(options.getTimeout(), (l) -> {
context.runOnContext(v -> {
// Check if the operation has not already been completed
if (!operationResult.future().isComplete()) {
if (call != null) {
call.timeout();
}
operationResult.fail(TimeoutException.INSTANCE);
}
// Else Operation has completed
});
});
}
Do you have a reproducer?
No
Steps to reproduce
In the examples (as in https://vertx.io/docs/vertx-circuit-breaker/java/) the creation of the CircuitBreaker and the call to execute are successive but only one creation should be done for mutiple calls.
Typical case would be creation in a construction.
For example, if the following code is copied exactly, the fallback will never be used because a new CB is used each time.
CircuitBreaker breaker = CircuitBreaker.create("my-circuit-breaker", vertx,
new CircuitBreakerOptions().setMaxFailures(5).setTimeout(2000)
);
breaker.executeWithFallback(
future -> {
vertx.createHttpClient().getNow(8080, "localhost", "/", response -> {
if (response.statusCode() != 200) {
future.fail("HTTP error");
} else {
response
.exceptionHandler(future::fail)
.bodyHandler(buffer -> {
future.complete(buffer.toString());
});
}
});
}, v -> {
// Executed when the circuit is opened
return "Hello";
})
.setHandler(ar -> {
// Do something with the result
});
Hello,
I follow the tutorial at http://vertx.io/docs/vertx-circuit-breaker/java/ to know how to use hystrix with vertx. But as I check, only metrics by Vertx Circruit Breaker are displayed in Hystrix Dashboard while metrics corresponding to HystrixCommand aren't. Am I wrong at something? Here is the sample code:
https://gist.github.com/tuan3w/de7cbc82572bbdf235bbb326becb21c8 .
Thanks
I am getting high memory usage with the circuit breaker. I am just using httpbin.org to get all success responses. For individual requests, it works fine. While running a load test the JVM old gen utilization is spiking up.
I have the main verticle code, pasting it here itself:
`
public class CleanServer extends AbstractVerticle {
Logger logger = Logger.getLogger(CleanServer.class.getName());
@Override
public void start(Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);
CircuitBreakerCache cbc = new CircuitBreakerCache(vertx);
router.route(HttpMethod.GET, "/get").handler(context -> {
List<String> domains = context.queryParam("user");
String domain = domains.get(0);
CircuitBreaker cb = cbc.getCircuitBreaker(domain + context.request().path());
HttpServerResponse serverResponse =
context.response().setChunked(true);
cb.executeWithFallback(promise -> {
WebClientOptions options = new WebClientOptions().setTryUseCompression(true).setTcpNoDelay(true).setTcpCork(true).setReceiveBufferSize(128).setConnectTimeout(400);
WebClient client = WebClient.create(vertx, options);
client.get(80, "httpbin.org", "/status/200")
.timeout(2000)
.send(ar -> {
if (ar.succeeded()) {
HttpResponse<Buffer> response = ar.result();
int statusCode = response.statusCode();
if (statusCode != 200) {
promise.fail(response.statusMessage());
} else {
serverResponse.end("Hello!!");
promise.complete();
}
} else {
promise.fail(ar.cause().getMessage());
}
});
}, v -> {
// Executed when the circuit is opened
logger.log(Level.INFO, domain + " Failed " + cb.state().toString() + " Error: Circuit open");
serverResponse.setStatusCode(200).setStatusMessage("Circuit Open").end("Circuit Open");
return context;
});
});
// Create the HTTP server
vertx.createHttpServer(new HttpServerOptions().setMaxInitialLineLength(10000))
// Handle every request using the router
.requestHandler(router)
// Start listening
.listen(8080)
// Print the port
.onSuccess(server ->
System.out.println(
"HTTP server started on port " + server.actualPort()
)
);
}
}
`
Circuit breaker options:
CircuitBreakerOptions() .setMaxFailures(50) .setTimeout(5000) .setFallbackOnFailure(true) .setResetTimeout(10000)));
<vertx.version>4.2.6</vertx.version>
JVM params used:
-XX:+UseG1GC -Xms4g -Xmx4g -XX:InitiatingHeapOccupancyPercent=70 -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
JVM memory with the load test.
Error:
WARNING: Thread Thread[vert.x-eventloop-thread-3,5,main] has been blocked for 3050 ms, time limit is 2000 ms
I think I am blocking the thread somewhere but not sure where exactly as the code seems pretty simple as given in the documentation.
A common use case for a circuit breaker is to retry or circuit break remote HTTP calls done by a web/HTTP client.
HTTP servers use the Retry-After
header to dynamically suggest to clients to retry after a defined period of time during their unavailability.
The current circuit breaker API allows defining the retry period at creation time only, so it's not possible to define it dynamically based on whatever the downstream service suggests.
(covered above)
Who should implement this feature ? are you volunteering for implementing this feature or
do you know that is able and willing implement this feature ?
I can help but I'm not entirely sure about a Vert.x idiomatic API for exposing this.
Every time the circuit state changes, an event can be published on the event bus. but,How to work together in different instances?
copy from vert-x3/issues#294
The problem we're seeing is that the circuit breaker opens immediately when calling success() on a future from the breaker handler. Even if the breaker was just closed moments before. This means it does not respect the closing timeout, nor does it pass trough the half-open state.
Our usecase:
In the circuit-breaker, we're doing some async work. This means a bunch of events can be inside the breaker handler queued.
At a certain moment, most of the async events start failing, resulting in the resultbreaker to open.
From this moment, no new work is scheduled on the breaker anymore but passed to the fallback.
However, since there are async tasks inside the breaker handler, some of them might still complete after the breaker opened. If one of them succeeds, it will call success() on the breaker future.
-> I would expect in this case that the breaker still stays opened, as the timeout period did not end yet. However the breaker will close immediately.
With this logic, I don't see a way how you can handle async events inside the circuit breaker.
I believe the problem is in CircuitBreakerImpl.java on line 203 where the reset() is called:
https://github.com/vert-x3/vertx-circuit-breaker/blob/master/src/main/java/io/vertx/circuitbreaker/impl/CircuitBreakerImpl.java#L203
I've added a testcase to illustrate the behavior.
When a circuit breaker reaches a timeout it marks a future as "failed". However, this future may be completed (successfully or not) by the user code.
This can be easily fixed using an intermediate future.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.