streaming-processor's People
streaming-processor's Issues
Consume stream info from bindings
Processor crashes if invoker stops
Exception in thread "main" io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
at io.grpc.Status.asRuntimeException(Status.java:526)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:434)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:678)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:403)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
reactor.core.publisher.Flux.concatMap(Flux.java:3425)
io.projectriff.processor.Processor.lambda$run$6(Processor.java:180)
Error has been observed by the following operator(s):
|_ Flux.concatMap ⇢ io.projectriff.processor.Processor.lambda$run$6(Processor.java:180)
|_ Flux.concatMap ⇢ io.projectriff.processor.Processor.run(Processor.java:179)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Flux.blockLast(Flux.java:2389)
at io.projectriff.processor.Processor.run(Processor.java:187)
at io.projectriff.processor.Processor.main(Processor.java:118)
Error on stream processor with no output stream
Some processors consume one or more input streams without producing an output stream. Currently this is not allowed (running latest v0.5 snapshot)
$ riff streaming processor create echo-out \
> --function-ref echo \
> --input out
Created processor "echo-out"
$ kubectl logs -f echo-out-processor-krjj2-64589d9684-pvqhx -c processor
Missing one of the following environment variables: [INPUTS, OUTPUTS, OUTPUT_CONTENT_TYPES, FUNCTION, GROUP]
INPUTS = franz-kafka-gateway-dpn2x.default:6565/default_out
OUTPUTS =
OUTPUT_CONTENT_TYPES = null
FUNCTION = localhost:8081
GROUP = echo-out
Make sure INPUT_START_OFFSETS and INPUT_NAMES have the same size
Consume OffsetReset option(s)
Process crashes when the gRPC invocation is cancelled by the invoker
Reproduced with processor @ 5884ec5 (master
at the time).
Tweaked the Node invoker to cancel the invocation as soon as a signal is received.
Result:
Connected to localhost:8081, after 599 ms
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN: Invoker: Unexpected Error: oopsie
at io.grpc.Status.asRuntimeException(Status.java:533)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
reactor.core.publisher.Flux.concatMap(Flux.java:3622)
io.projectriff.processor.Processor.lambda$run$8(Processor.java:266)
Error has been observed at the following site(s):
|_ Flux.concatMap ⇢ at io.projectriff.processor.Processor.lambda$run$8(Processor.java:266)
|_ Flux.concatMap ⇢ at io.projectriff.processor.Processor.run(Processor.java:265)
Stack trace:
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:449)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2483)
at io.projectriff.processor.Processor.run(Processor.java:273)
at io.projectriff.processor.Processor.main(Processor.java:181)
Process finished with exit code 1
Pass input/outputNames in StartFrame
Following projectriff/system#159 and 856d8e8, pass those values in StartFrame
streaming processor crashes
I have seen the processor crash with the below stack trace a few times now:
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
reactor.core.publisher.Flux.concatMap(Flux.java:3425)
io.projectriff.processor.Processor.lambda$run$7(Processor.java:188)
Error has been observed by the following operator(s):
|_ Flux.concatMap ? io.projectriff.processor.Processor.lambda$run$7(Processor.java:188)
|_ Flux.concatMap ? io.projectriff.processor.Processor.run(Processor.java:187)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Flux.blockLast(Flux.java:2389)
at io.projectriff.processor.Processor.run(Processor.java:195)
at io.projectriff.processor.Processor.main(Processor.java:127)
Steps to reproduce:
riff function create fats-repeater --git-repo https://github.com/projectriff/fats --git-revision 56fbd4f8b49af7bcf3e0fd1616c6cbf2a899c4b3 --sub-path functions/repeater/java --tail
riff streaming stream create nums --content-type 'application/json' --provider franz-kafka-provisioner
riff streaming stream create rezs --content-type 'application/json' --provider franz-kafka-provisioner
riff streaming stream create lettrs --content-type 'text/plain' --provider franz-kafka-provisioner
riff streaming processor create fats-repeater --function-ref fats-repeater --input lettrs --input nums --output rezs --tail
Processor sometimes gets stuck after sending Liiklus subscription requests
That has happened to me locally a lot when testing the node streaming invoker.
Because the processor seems to get stucks, it does not receive any inputs, hence not forwarding any to the invoker.
Gracefully Complete() input stream on shutdown
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.