reactor / reactor-addons Goto Github PK
View Code? Open in Web Editor NEWAdditional optional modules for the Reactor project
Additional optional modules for the Reactor project
In StepVerifier#create()
and StepVerifier#withVirtualTime()
, the position of the long n
parameter should be consistent (currently it is in last position in create()
, and in first position in withVirtualTime()
.
I suggest to deprecate in 3.0 StepVerifier#withVirtualTime(long n, Supplier scenarioSupplier)
and StepVerifier#withVirtualTime(long n, Supplier scenarioSupplier, Supplier vtsLookup)
and add new StepVerifier#withVirtualTime(Supplier scenarioSupplier, long n)
and StepVerifier#withVirtualTime(Supplier scenarioSupplier, long n, Supplier vtsLookup)
static methods.
The deprecate methods should be removed in Reactor 3.1.
The idea for an AssertPublisher
would be to offer an alternative to using DirectProcessor
in tests. It would expose:
next
, complete
and error
signals on demandonNext
and terminate with onComplete
in one callOptionally, it could authorize some "misbehaviors" wrt the RS spec. For instance, contrary to DirectProcessor
it could authorize to perform next(T)
despite the requested amount being 0.
Publish the Javadoc of each projects automatically
Having the StepVerifier
layer, as currently returned by methods from FinalStep
(expectComplete
, expectError
, thenCancel
), is still necessary:
Duration
log()
gingSubscriber
rather than triggering subscription and verification...That said, concern has been raised that the verify()
step is easy to forget. Maybe adding a convenience sibling method to all these FinalStep methods that also triggers a verification would help. It would only cover the case where no log()
is triggered and no timeout is used.
For example, verifyComplete()
would call .expectComplete().verify()
...
Hello,
I've developed a CircuitBreaker alternative to Hystrix which is designed for functional programming.
https://github.com/RobWin/javaslang-circuitbreaker
I'm currently developing a custom RxJava operator for this library, so that I can attach a CircuitBreaker to any Single/Mono or Flowable/Flux.
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName");
Flowable.fromCallable(backendService::doSomething)
.lift(CircuitBreakerOperator.of(circuitBreaker))
I would like to develop a custom operator for Spring Reactor, but I need your advice. I've spoke about my CircuitBreaker operator implementation with @smaldini and I think it still has some issues.
Kind regards,
Robert
When trying to use reactor in our Spring based project, I had to transform from ListenableFuture
to CompletableFuture
to leverage async part of Reactor. It would be handy to have some sort of adapter. Any plans for this?
Tried to put together an example.
right now, the following test succeeds:
@Test
public void expectNextCount2elements() {
Flux<String> flux = Flux.just("foo", "bar");
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectNextCount(2)
.verifyComplete();
}
It means that the expectNextCount
counts the "foo" and "bar" values asserted by expectNext
.
But the following test fails:
@Test
public void expectNextCount4elements() {
Flux<String> flux = Flux.just("foo", "bar", "foz", "baz");
StepVerifier.create(flux)
.expectNext("foo", "bar")
.expectNextCount(2)
.verifyComplete();
}
My gut feeling is that the inverse should be true: expectNextCount
could then be used in conjunction with expectNext
(or other onNext expectations) in order to "skip" over an amount of onNext notifications, eg. asserting the first 5 elements and last element in a sequence of 20 elements in details, but only care about the number of elements in the middle:
@Test
public void expectNextCountRange() {
Flux<Integer> flux = Flux.range(1, 20);
StepVerifier.create(flux)
.expectNext(1, 2, 3, 4, 5) //<- assert first five
.expectNextCount(14) //<- only care that elements 6-19 are there
.expectNext(20) //<- assert last element
.verifyComplete();
}
Some expectations could have a delay factor e.g.
ScriptedSubscriber.create()
.expectValues(Duration.ofSeconds(1), 1, 2, 3)
.expectComplete(Duration.ofSeconds(1))
.verify(flux);
Hi,
If the subscriber is unable to write logs because of any reason the logger doAppend stuck in an infinite waiting loop because the writer cannot surpass the reader. Below is the is the thread dump captured in that moment.
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1850)
at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1816)
at reactor.core.publisher.UnsafeRingBuffer.next(RingBuffer.java:1576)
at reactor.core.publisher.EventLoopProcessor.onNext(EventLoopProcessor.java:445)
at reactor.logback.AsyncAppender.queueLoggingEvent(AsyncAppender.java:238)
at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:104)
at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:46)
at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:396)
at ch.qos.logback.classic.Logger.debug(Logger.java:503)
at com.non.reactive.bank.Application.main(Application.java:18)
Also, below is the code snippet where the requesting thread stuck. The code snippet is taken from reactor RingBuffer:
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.getAsLong();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.getAsLong();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
if(spinObserver != null) {
spinObserver.run();
}
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
I can see that that the wait strategy is not being used to handle this situation. Is there any way to get out of this situation either by throwing any timeout exception or anything else rational. I know we may loose logs for some time but at least the requesting threads will not hang because of this issue.
In Reactor v1.x we had a nice notify with a callback function which has been removed in v2.x. I'm sure there's a reason for this, however, as someone migrating to v2 I found myself lost.
Here's a unit test Gist to show what I tried, what works, and what doesn't:
https://gist.github.com/connollyst/da63998f65c309a442d8228ef61f7453
The gist of the gist..
// In reactor v1.x, we used to have..
reactor.notify("say.hi", Event.wrap("Hello World!"), call::back);
// In reactor v2.x it's gone, we have something similar in..
bus.sendAndReceive("say.hi", Event.wrap("Hello World!"), call::back);
// .. but this only works if my callback is set up with bus.receive, mine is bus.on
// events have a 'replyTo', lets try that..
bus.on($("callback"), call::back);
bus.notify("say.hi", Event.wrap("Hello World!", "callback"));
// .. nope, doesn't get called
Any thoughts?
I'm afraid I can't maintain pipes anymore. Do you think it's worth to deprecate and remove? It never really took off, either, so it won't be such big of a loss...
The name is a bit redundant, due to the fact that it is a child class of StepVerifier
The stacktrace printed by default by DefaultStepVerifierBuilder#fail()
seems not very relevant nor useful.
In my use case my test is (Kotlin code):
kotlin
Hooks.onOperator<Any> { h -> h.operatorStacktrace() }
StepVerifier.create(response.flatMap{ r -> r.bodyToFlux(User::class)})
.consumeNextWith {
assert(it == User(1L, "Robert")) }
.consumeNextWith { assert(it == User(2L, "Raide")) }
.consumeNextWith { assert(it == User(3L, "Ford")) }
.expectComplete()
.verify()
I get that output:
java.lang.AssertionError: expectation "consumeNextWith" failed (expected: onNext(); actual: onError(java.lang.ArrayIndexOutOfBoundsException: -1))
at reactor.test.DefaultStepVerifierBuilder.failPrefix(DefaultStepVerifierBuilder.java:1653)
at reactor.test.DefaultStepVerifierBuilder.fail(DefaultStepVerifierBuilder.java:1649)
at reactor.test.DefaultStepVerifierBuilder.lambda$consumeNextWith$1(DefaultStepVerifierBuilder.java:139)
at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:1409)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1050)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:996)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:740)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber$InnerSubscriber.onError(MonoFlatMap.java:196)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:140)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:322)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:558)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:538)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onError(FluxFlatMap.java:517)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:356)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:185)
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:291)
at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:284)
at reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:493)
at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at java.lang.Thread.run(Thread.java:745)
And to have the relevant stacktrace that show me the origin of the error, I have to call signal.getThrowable().printStackTrace()
from consumeNextWith()
:
java.lang.ArrayIndexOutOfBoundsException: -1
at io.netty.buffer.HeapByteBufUtil.getByte(HeapByteBufUtil.java:24)
at io.netty.buffer.UnpooledHeapByteBuf._getByte(UnpooledHeapByteBuf.java:323)
at io.netty.buffer.UnpooledHeapByteBuf.getByte(UnpooledHeapByteBuf.java:318)
at org.springframework.http.codec.json.JsonObjectDecoder$1.apply(JsonObjectDecoder.java:140)
at org.springframework.http.codec.json.JsonObjectDecoder$1.apply(JsonObjectDecoder.java:103)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:353)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:316)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:185)
at reactor.ipc.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:291)
at reactor.ipc.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:284)
at reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:493)
at reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
at java.lang.Thread.run(Thread.java:745)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxFlatMap] :
reactor.core.publisher.Flux.flatMap(Flux.java:3401)
reactor.core.publisher.Flux.flatMap(Flux.java:3378)
reactor.core.publisher.Flux.flatMap(Flux.java:3335)
org.springframework.http.codec.json.JsonObjectDecoder.decode(JsonObjectDecoder.java:103)
org.springframework.http.codec.json.Jackson2JsonDecoder.decodeInternal(Jackson2JsonDecoder.java:114)
org.springframework.http.codec.json.Jackson2JsonDecoder.decode(Jackson2JsonDecoder.java:82)
org.springframework.http.codec.DecoderHttpMessageReader.read(DecoderHttpMessageReader.java:75)
org.springframework.http.codec.BodyExtractors.lambda$null$2(BodyExtractors.java:91)
java.util.Optional.map(Optional.java:215)
org.springframework.http.codec.BodyExtractors.readWithMessageReaders(BodyExtractors.java:120)
org.springframework.http.codec.BodyExtractors.lambda$toFlux$3(BodyExtractors.java:89)
org.springframework.web.client.reactive.DefaultClientResponse.body(DefaultClientResponse.java:72)
org.springframework.web.client.reactive.DefaultClientResponse.bodyToPublisher(DefaultClientResponse.java:102)
org.springframework.web.client.reactive.DefaultClientResponse.bodyToFlux(DefaultClientResponse.java:87)
mixit.support.ExtensionsKt.bodyToFlux(Extensions.kt:53)
mixit.integration.UserIntegrationTests$findAll$1.apply(UserIntegrationTests.kt:31)
mixit.integration.UserIntegrationTests$findAll$1.apply(UserIntegrationTests.kt:16)
reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:121)
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:173)
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:71)
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:124)
reactor.core.publisher.FluxRetryPredicate$RetryPredicateSubscriber.onNext(FluxRetryPredicate.java:78)
reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:95)
reactor.ipc.netty.channel.PooledClientContextHandler.fireContextActive(PooledClientContextHandler.java:73)
reactor.ipc.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:474)
reactor.ipc.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:112)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:351)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:373)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:129)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:651)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:574)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:488)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:450)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873)
Composition chain until failing Operator :
|_ Flux.flatMap(JsonObjectDecoder.java:103)
|_ Flux.map(Jackson2JsonDecoder.java:115)
It should be nice to have that information by default in the console after a failed test.
Hello,
Recently, I start to use StepVerifier to test Flux and Monos in order to have less manual "waiting" during tests (also to speed up the tests). However, I am confused about how to make it work with processors + buffer. I always have the similar issue with most of the processors:
java.util.concurrent.RejectedExecutionException: Scheduler unavailable
@Test
public void testWithoutBuffer_whichPasses() {
final UnicastProcessor<String> processor = UnicastProcessor.create();
final Publisher<Integer> publisher = processor.map(String::length);
processor.onNext("foo");
processor.onNext("foobar");
StepVerifier.create(publisher)
.expectSubscription()
.expectNext(3)
.expectNext(6)
.thenCancel()
.log().verify();
}
@Test
public void testWithBuffer_whichDoesNotPass() {
final UnicastProcessor<String> processor = UnicastProcessor.create();
final Publisher<Integer> publisher = processor
.bufferTimeout(5, Duration.ofSeconds(1))
.map(words -> words.stream().map(String::length).collect(toList()))
.flatMapIterable(Function.identity());
processor.onNext("foo");
processor.onNext("foobar");
StepVerifier.withVirtualTime(() -> publisher)
.expectSubscription()
.thenAwait(Duration.ofSeconds(2))
.expectNext(3)
.expectNext(6)
.thenCancel()
.log().verify();
}
The first test pass, however, the second one with buffer doesn't.. I am wondering what's the correct usage of the StepVerifier withVirtualTime?
Full example here:
https://github.com/chengchen/rx-smoker/blob/master/src/test/java/com/edgelab/marketdata/consumer/UnicastProcessorTest.java
Thank you very much for the help!
TestPublisher ts = TestPublisher.createNonCompliant(Violation.CLEANUP_ON_TERMINATE);
ts.flux().subscribe();
ts.complete();
ts.complete(); //should try duplicate complete
Either check the request amount >= expect amount during conflation or add a scheduled task that logs a particular expectation is hung for n seconds (cancelled in finally of onExpectations)
Example of hanging: request(2).expectNextCount(3)
Consider adding arbitrary scripted tasks for use cases like :
DirectProcessor<Void> p = DirectProcessor.create();
ScriptedSubscriber.create(0)
.doRequest(3)
.expectValues(1, 2, 3)
.then(p::onComplete)
.expectComplete()
.verify(flux.takeUntil(p))
if ((m & expectedFusionMode) != m) //fail
instead of
if ((m & expectedFusionMode) != expectedFusionMode) //fail
With the following test:
StepVerifier.create(response.flatMap{ r -> r.bodyToFlux(Event::class)})
.consumeNextWith { assertEquals(2012, it.year) }
.consumeNextWith { assertEquals(2013, it.year) }
.consumeNextWith { assertEquals(2014, it.year) }
.consumeNextWith { assertEquals(2015, it.year) }
.consumeNextWith { assertEquals(2016, it.year) }
.consumeNextWith { assertEquals(2017, it.year) }
.expectComplete()
.verify()
I get the following not very useful error message:
Expectation failed (failed running expectation on signal [onNext(FluxError)] with [java.lang.ClassCastException]:
reactor.core.publisher.FluxError cannot be cast to mixit.model.sponsor.Event)
java.lang.AssertionError: expectation failed (failed running expectation on signal [onNext(FluxError)] with [java.lang.ClassCastException]:
reactor.core.publisher.FluxError cannot be cast to mixit.model.sponsor.Event)
at reactor.test.DefaultStepVerifierBuilder.failPrefix(DefaultStepVerifierBuilder.java:1679)
at reactor.test.DefaultStepVerifierBuilder.fail(DefaultStepVerifierBuilder.java:1675)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1017)
at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onNext(DefaultStepVerifierBuilder.java:764)
at reactor.core.publisher.MonoFlatMap$FlattenSubscriber$InnerSubscriber.onNext(MonoFlatMap.java:191)
It would expect the message + stacktrace of the error instead of this class cast error.
Hi,
If the subscriber is unable to write logs because of any reason the logger doAppend stuck in an infinite waiting loop because the writer cannot surpass the reader. Below is the is the thread dump captured in that moment.
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:338)
at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1850)
at reactor.core.publisher.MultiProducerRingBuffer.next(RingBuffer.java:1816)
at reactor.core.publisher.UnsafeRingBuffer.next(RingBuffer.java:1576)
at reactor.core.publisher.EventLoopProcessor.onNext(EventLoopProcessor.java:445)
at reactor.logback.AsyncAppender.queueLoggingEvent(AsyncAppender.java:238)
at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:104)
at reactor.logback.AsyncAppender.doAppend(AsyncAppender.java:46)
at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:48)
at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:273)
at ch.qos.logback.classic.Logger.callAppenders(Logger.java:260)
at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:442)
at ch.qos.logback.classic.Logger.filterAndLog_0_Or3Plus(Logger.java:396)
at ch.qos.logback.classic.Logger.debug(Logger.java:503)
at com.non.reactive.bank.Application.main(Application.java:18)
Also, below the code snippet where it stuck, taken from reactor RingBuffer:
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.getAsLong();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.getAsLong();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = RingBuffer.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
if(spinObserver != null) {
spinObserver.run();
}
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
I can see that that the wait strategy is not being used to handle this situation. Is there any way to get out of this situation either by throwing any timeout exception or anything else rational. I know we may lose logs for some time but at least the requesting threads will not hang because of this issue.
The StepVerifier
is getting more and more complex, and assumes that it should perform a few sanity checks internally. In some scenarios, these assumptions don't apply and the checks should be skipped.
The first of these checks is the "under-request" one (that will prevent the verification if it thinks the request amount is too low compared to the expectations, but can sometimes be wrong about it.
A good way to do that is to introduce a StepVerifierOptions
that would replace the current long initialRequest
parameter, superseding it and additionally allowing to disable such checks.
I am waiting for the bus release to adopt my project. Although core is out for some weeks now, bus is still snapshot. What are your plans?
FinalStep
only propose to trigger the cancellation via thenCancel()
, but cases where the subscription is captured and cancelled externally (eg. in a then()
step) isn't covered.
Much like there is an expectComplete()
terminal expectation, an expectCancelled()
method should be offered.
Looks like this is outdated, and setup.gradle
might not be entirely relevant either.
After upgrade reactor-logback to latest version in last week, i find a strange behaviour on reactor-logback's appender: the reactor.logback.AsyncAppender
create some non-daemon
thread for log consuming but didn't close it properly.
After call tomcat's shutdown script, using jstack
to print thread-stacks get this:
"logger-2" #19 prio=5 os_prio=0 tid=0x00007f2602874800 nid=0x20c9 waiting on condition [0x00007f268248f000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000827799a0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy.waitFor(LiteBlockingWaitStrategy.java:56)
at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
at reactor.core.processor.RingBufferWorkProcessor$WorkSignalProcessor.run(RingBufferWorkProcessor.java:765)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"logger-1" #18 prio=5 os_prio=0 tid=0x00007f2602874000 nid=0x20c8 waiting on condition [0x00007f2682590000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008277a008> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at reactor.jarjar.com.lmax.disruptor.LiteBlockingWaitStrategy.waitFor(LiteBlockingWaitStrategy.java:56)
at reactor.jarjar.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
at reactor.core.processor.RingBufferWorkProcessor$WorkSignalProcessor.run(RingBufferWorkProcessor.java:765)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"Attach Listener" #24910 daemon prio=9 os_prio=0 tid=0x00007f2640001000 nid=0x3975 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"DestroyJavaVM" #24909 prio=5 os_prio=0 tid=0x00007f26a4178000 nid=0x2086 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Abandoned connection cleanup thread" #20 daemon prio=5 os_prio=0 tid=0x00007f2617320800 nid=0x20ce in Object.wait() [0x00007f268175c000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x0000000087abb8f8> (a java.lang.ref.ReferenceQueue$Lock)
at com.mysql.jdbc.AbandonedConnectionCleanupThread.run(AbandonedConnectionCleanupThread.java:43)
"GC Daemon" #13 daemon prio=2 os_prio=0 tid=0x00007f26a43f5000 nid=0x209c in Object.wait() [0x00007f26901cd000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000825760a0> (a sun.misc.GC$LatencyLock)
at sun.misc.GC$Daemon.run(GC.java:117)
- locked <0x00000000825760a0> (a sun.misc.GC$LatencyLock)
"AsyncFileHandlerWriter-1304836502" #12 daemon prio=5 os_prio=0 tid=0x00007f26a4156000 nid=0x2099 waiting on condition [0x00007f2690a45000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008209fbf8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingDeque.pollFirst(LinkedBlockingDeque.java:522)
at java.util.concurrent.LinkedBlockingDeque.poll(LinkedBlockingDeque.java:684)
at org.apache.juli.AsyncFileHandler$LoggerThread.run(AsyncFileHandler.java:145)
The version of reactor: 2.0.8.RELEASE
I started to learn reactor with this.
Here I stuck with StepVerifier
tests. So in this example I modified the original test a bit just to make it quicker.
@Test
public void countWithVirtualTime() {
expect3600Elements(() -> Flux.interval(Duration.ofMills(1)).take(5));
}
void expect3600Elements(Supplier<Flux<Long>> supplier) {
StepVerifier.withVirtualTime(supplier)
.expectNextCount(5)
.expectComplete()
.verify();
}
Maybe I'm doing something wrong here but it goes to infinite loop in DefaultVerifySubscriber#pollTaskEventOrComplete
.
The analogous test just works fine:
@Test
public void count() {
stepVerifierTesting.expect10Elements(Flux.interval(Duration.ofSeconds(1)).take(10));
}
void expect10Elements(Flux<Long> flux) {
StepVerifier.create(flux)
.expectNextCount(10)
.expectComplete()
.verify();
}
Maybe I misunderstood StepVerifier#withVirtualTime
but it looks the issue for me.
This could set up the relevant hooks at the beginning of verify, then correctly always reset them at the end.
MVP probably expectErrorDropped(Throwable...)
and expectValueDropped(Object...)
, have the hook capture to a collection (as several drops can happen in a row, at least for values).
In StepVerifier
, if a scenario wraps RequestEvent
in SubscriptionTaskEvent
, these request events are not taken into account for requested
tracking. This results in errors about request overflow being thrown.
Eg. this test fails:
@Test
@SuppressWarnings("unchecked")
public void requestBufferDoesntOverflow() {
LongAdder requestCallCount = new LongAdder();
LongAdder totalRequest = new LongAdder();
Flux<Integer> source = Flux.range(1, 10).hide()
.doOnRequest(r -> requestCallCount.increment())
.doOnRequest(totalRequest::add);
StepVerifier.withVirtualTime(//start with a request for 1 buffer
() -> source.bufferUntil(i -> i % 3 == 0), 1)
.expectSubscription()
.expectNext(Arrays.asList(1, 2, 3))
.expectNoEvent(Duration.ofSeconds(1))
.thenRequest(2)
.expectNext(Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9))
.expectNoEvent(Duration.ofSeconds(1))
.thenRequest(3)
.expectNext(Collections.singletonList(10))
.expectComplete()
.verify();
assertThat(requestCallCount.intValue()).isEqualTo(11); //10 elements then the completion
assertThat(totalRequest.longValue()).isEqualTo(11L); //ignores the main requests
}
(this test could serve as issue validation in StepVerifierTests
)
Add a reactor-math extension that supports mathematical operations like sum, average, max, min etc. A similar extension is already available for RxJava2.
When using multiple tests with VirtualTimeScheduler
and StepVerifier
, a problem with the way the resources are cleaned up has been discovered.
The first test would instantiate a dedicated VirtualTimeScheduler
and immediately dispose()
it.
The second test would not instantiate any specific vts, but would use withVirtualTime
.
Despite that, the second test would see REJECTED executions, coming from the same vts as the previous test.
This is due to the fact that StepVerifier
enables the first vts, but never cleans it up. It relies on the shutdown
method doing it, but in the first test this is prevented due to the vts being shutdown a first time before the StepVerifier
enables it:
VirtualTimeScheduler
, which in turn calls VirtualTimeScheduler.enable
, which returns the CURRENT.Encouraged by https://twitter.com/projectreactor/status/788678127606173696 I'd like to start an open discussion as an outsider to the project that's interested in using the proposed API.
Like @smaldini my background is rotted in the Groovy community so I was a bit confused when I sa the name for ScriptedSubscriber
. I though it was related to scripting capabilities (Groovy, BSF, JSR-223, etc).
Per https://github.com/reactor/reactor-addons/blob/master/reactor-test/src/test/java/reactor/test/subscriber/ScriptedSubscriberIntegrationTests.java#L134-L142 the usage of @Test(expected = AssertionError.class)
is to coarse IMHO. We're expecting a test to fail, so it can fail for any reason, not necessarily because of the intended reason. The linked testcase may fail because the developer added more expectedValue()
calls, changed expectedValue('foo')
to expectedValue('bar')
, or simply by adding assertTrue(false)
at the top of the test method.
I'd like to see additional expect
methods that take Hamcrest Matcher
as argument.
Would it make sense to rename verify
to verifyOn
?
Would it make sense to turn around the start/termination methods (create
and verify
) such that the script reads as follows:
ScriptedSubscriber.subscribeOn(flux)
.expectValue("foo")
.expecteComplete()
.verify();
Or is it the intention that a "script" may be applied to multiple observable sources? If so then verify(src)
makes more sense.
Hi again,
Another v1 to v2 migration issue here.
I'm using eventBus.notify(event)
where my eventBus is using a ThreadPoolExecutorDispatcher
(Spring Configuration at the end). I see that my events are being consumed and processed by the 5 threads in my thread pool. However, every so often the consumer is being executed in the event origin thread.
This is not the expected behavior and seems to be no to v2. What's going on here?
I think this might be related to this unresolved conversation from last year: "THREAD POOL is not working with different threads"
I can't figure out the take away from that discussion though.
To be honest we can't figure out what's going on with Reactor. This v1 -> v2 rewrite seems to have made a mess of thing. Changing the APIs without providing the documentation isn't really how things should be done. One of your main portals is that Spring promoted this project as an asynchronous event bus.. but that example code no longer works, eek!
I hate to say it but our company will surely be moving away from Reactor going forward. Still, perhaps this is a bug that can be fixed, or a "feature" that should be documented. Hopefully this issue report helps someone.
Good luck.
private static final int THREAD_POOL_SIZE = 5;
private static final int BACKLOG_SIZE = 1024;
private static final String THREAD_NAME = "my-reactor";
@Bean
public EventBus reactor(Environment env) {
LOG.debug("Creating reactor..");
return new EventBusSpec()
.env(env)
.dispatcher(reactorDispatcher())
.consumerNotFoundHandler(notificationKey -> {
if(notificationKey != null) {
String key = notificationKey.toString();
if(key.startsWith(ExternalDataEvent.SELECTOR)) {
LOG.error("No consumer found for {}", key);
}
}
})
.broadcastEventRouting()
.uncaughtErrorHandler(error -> LOG.error(
"An error occurred while processing event: {}", error.getMessage(), error))
.get();
}
@Bean
public Dispatcher reactorDispatcher() {
LOG.debug("Creating thread pool dispatcher with thread prefix '{}'", THREAD_NAME);
return new ThreadPoolExecutorDispatcher(THREAD_POOL_SIZE, BACKLOG_SIZE, THREAD_NAME);
}
I came here looking for reactor-alloc as the main reactor project lists reactor-alloc under reactor-addons. I was going to ask about a RingBuffer based lock-free implementation of an allocator (for object pooling) but was surprised to see that reactor-alloc doesn't exist.
My interest in a lock-free allocator is related to my discussion here
https://groups.google.com/d/msg/reactor-framework/h2Dsm0USZjw/bGNotRyNCQAJ
If its not possible to avoid object creation when using reactor based messaging, then a lock free allocator would be useful to mitigate GC issues.
eg
Flux
.range(1, 10)
.hide()
.reduce((a, b) -> a + b)
.doOnSuccess(v -> { });
if the operator (like doOnSuccess above) always implement Fuseable
, expectNoFusionSupport
will always fail despite the operator's requestFusion(ANY)
possibly returning Fuseable.NONE
.
As of now the Verifier
can be passed a Supplier<VirtualTimeScheduler>
but it doesn't entirely manage it, meaning that it is still the responsibility of the developer to correctly call VirtualTimeScheduler#enable
rather than the simpler factory method VirtualTimeScheduler#create
.
enable
could be reworked to accept an existing VirtualTimeScheduler
instance though, and it could be made idempotent if called twice with the same instance.
It would allow in turn to let the default verifier implementation capture a VirtualTimeScheduler
in its verify
method and enable
it (if the scheduler was supplied through enable
already, idempotency kicks in, otherwise the scheduler is correctly enabled and used by the factories).
(wrong project)
advanceTimeXxx
.A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.