Giter VIP home page Giter VIP logo

streaming-processor's People

Contributors

dependabot-preview[bot] avatar dependabot[bot] avatar ericbottard avatar fbiville avatar scothis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

streaming-processor's Issues

Processor crashes if invoker stops

Exception in thread "main" io.grpc.StatusRuntimeException: UNAVAILABLE: Network closed for unknown reason
	at io.grpc.Status.asRuntimeException(Status.java:526)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:434)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:678)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:403)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:459)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:63)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:546)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:467)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:584)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
	reactor.core.publisher.Flux.concatMap(Flux.java:3425)
	io.projectriff.processor.Processor.lambda$run$6(Processor.java:180)
Error has been observed by the following operator(s):
	|_	Flux.concatMapio.projectriff.processor.Processor.lambda$run$6(Processor.java:180)
	|_	Flux.concatMapio.projectriff.processor.Processor.run(Processor.java:179)

	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Flux.blockLast(Flux.java:2389)
		at io.projectriff.processor.Processor.run(Processor.java:187)
		at io.projectriff.processor.Processor.main(Processor.java:118)

Error on stream processor with no output stream

Some processors consume one or more input streams without producing an output stream. Currently this is not allowed (running latest v0.5 snapshot)

$ riff streaming processor create echo-out \
>   --function-ref echo \
>   --input out
Created processor "echo-out"

$ kubectl logs -f echo-out-processor-krjj2-64589d9684-pvqhx -c processor
Missing one of the following environment variables: [INPUTS, OUTPUTS, OUTPUT_CONTENT_TYPES, FUNCTION, GROUP]
  INPUTS = franz-kafka-gateway-dpn2x.default:6565/default_out
  OUTPUTS = 
  OUTPUT_CONTENT_TYPES = null
  FUNCTION = localhost:8081
  GROUP = echo-out

Process crashes when the gRPC invocation is cancelled by the invoker

Reproduced with processor @ 5884ec5 (master at the time).
Tweaked the Node invoker to cancel the invocation as soon as a signal is received.

Result:

Connected to localhost:8081, after 599 ms
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN: Invoker: Unexpected Error: oopsie
	at io.grpc.Status.asRuntimeException(Status.java:533)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
	reactor.core.publisher.Flux.concatMap(Flux.java:3622)
	io.projectriff.processor.Processor.lambda$run$8(Processor.java:266)
Error has been observed at the following site(s):
	|_ Flux.concatMap ⇢ at io.projectriff.processor.Processor.lambda$run$8(Processor.java:266)
	|_ Flux.concatMap ⇢ at io.projectriff.processor.Processor.run(Processor.java:265)
Stack trace:
		at io.grpc.Status.asRuntimeException(Status.java:533)
		at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:449)
		at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
		at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
		at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
		at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
		at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
		at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
		at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
		at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
		at reactor.core.publisher.Flux.blockLast(Flux.java:2483)
		at io.projectriff.processor.Processor.run(Processor.java:273)
		at io.projectriff.processor.Processor.main(Processor.java:181)

Process finished with exit code 1

streaming processor crashes

I have seen the processor crash with the below stack trace a few times now:

Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN
	at io.grpc.Status.asRuntimeException(Status.java:533)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxConcatMap] :
	reactor.core.publisher.Flux.concatMap(Flux.java:3425)
	io.projectriff.processor.Processor.lambda$run$7(Processor.java:188)
Error has been observed by the following operator(s):
	|_	Flux.concatMap ? io.projectriff.processor.Processor.lambda$run$7(Processor.java:188)
	|_	Flux.concatMap ? io.projectriff.processor.Processor.run(Processor.java:187)

	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Flux.blockLast(Flux.java:2389)
		at io.projectriff.processor.Processor.run(Processor.java:195)
		at io.projectriff.processor.Processor.main(Processor.java:127)

Steps to reproduce:

riff function create fats-repeater --git-repo https://github.com/projectriff/fats --git-revision 56fbd4f8b49af7bcf3e0fd1616c6cbf2a899c4b3 --sub-path functions/repeater/java --tail
riff streaming stream create nums --content-type 'application/json' --provider franz-kafka-provisioner
riff streaming stream create rezs --content-type 'application/json' --provider franz-kafka-provisioner
riff streaming stream create lettrs --content-type 'text/plain' --provider franz-kafka-provisioner

riff streaming processor create fats-repeater --function-ref fats-repeater --input lettrs --input nums --output rezs --tail

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.