projectriff / riff Goto Github PK
View Code? Open in Web Editor NEWriff is for functions
Home Page: https://projectriff.io
License: Apache License 2.0
riff is for functions
Home Page: https://projectriff.io
License: Apache License 2.0
The java function invoker has a "-Xmx..." limiting the Java heap, which seems sensible, but probably needs to be parameterized.
Saw this error in the event-dispatcher logs when trying the echo sample
on master HEAD 93a5d59
[event-dispatcher-3155758373-hk39t] Exception in thread "Thread-2" java.lang.IllegalArgumentException: You can only check the position for partitions assigned to this consumer.
[event-dispatcher-3155758373-hk39t] at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1239)
[event-dispatcher-3155758373-hk39t] at io.sk8s.event.dispatcher.KafkaConsumerMonitor.lambda$null$5(KafkaConsumerMonitor.java:112)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
[event-dispatcher-3155758373-hk39t] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
[event-dispatcher-3155758373-hk39t] at io.sk8s.event.dispatcher.KafkaConsumerMonitor.lambda$compute$6(KafkaConsumerMonitor.java:116)
[event-dispatcher-3155758373-hk39t] at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
[event-dispatcher-3155758373-hk39t] at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
[event-dispatcher-3155758373-hk39t] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[event-dispatcher-3155758373-hk39t] at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
[event-dispatcher-3155758373-hk39t] at io.sk8s.event.dispatcher.KafkaConsumerMonitor.compute(KafkaConsumerMonitor.java:100)
[event-dispatcher-3155758373-hk39t] at io.sk8s.event.dispatcher.EventDispatchingHandler$ScalingMonitor.run(EventDispatchingHandler.java:161)
[event-dispatcher-3155758373-hk39t] at java.lang.Thread.run(Thread.java:745)
If we use a function that takes numeric input we get a class cast exception
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
at io.sk8s.invoker.java.function.FunctionConfigurationTests$Doubler.apply(FunctionConfigurationTests.java:118) ~[na:na]
at io.sk8s.invoker.java.server.JavaFunctionInvokerController.invoke(JavaFunctionInvokerController.java:85) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111-internal]
...
This will make it easier to demo since we can see when components are ready
monitor a topic's partitions and per-function consumer lag, via sliding windows over the joined streams of produced messages (using a separate "monitor" consumer group) and consumer offsets (for each inactive function's consumer group), and based on thresholds for "queue depth" and/or time, trigger the deployment of an active subscriber as the function handler... at that point the monitor can relinquish control for that particular function/partition-range, but after a configurable idle-time that handler could deactivate itself, at which point the monitor would need to resume control
Assigned: @trisberg
we should continue to rely on just our CRDs until things stabilize a bit, and then explore API aggregation:
https://kubernetes.io/docs/concepts/api-extension/apiserver-aggregation/
given how constrained our environment is, we should be able to avoid a lot of unnecessary initialization including classpath scanning, post-processors, etc. (we could also reduce dependencies)
We should add a flag to indicate that we want Grafana/Prometheus installed, maybe metricsDashboard
and also add a tracingDashboard
flag for Zipkin as well.
I get this:
[ERROR] Failed to execute goal on project sk8s-kubernetes-client: Could not resolve dependencies for project io.sk8s:sk8s-kubernetes-client:jar:1.0.0.BUILD-SNAPSHOT: Could not find artifact io.sk8s:kubernetes-model:jar:1.1-SNAPSHOT -> [Help 1]
We should build and publish that artifact in a repo or make it part of the build process.
We see some restarts due to Zookeeper not being available while Kafka and other components starts up. There is currently no way of controlling the order/success of a set of deployments.
Helm has lifecycle hooks like "pre-install", but anything using them become unmanaged and won't be removed by a helm delete
so the user would have to delete them manually.
We would have to write some "code" to perform this operation - it's unclear what this exactly means:
helm/helm#2777
So, for now, it doesn't look like the lifecycle hooks will help us.
I think we can use <dns_sd_config> to query for dns hostnames to include
See: https://prometheus.io/docs/operating/configuration/#dns_sd_config
For the HandlerPool dispatch strategy, we currently scale from 0 to 1 upon first invocation of a Function. We also need to scale from 1..N (and down) according to configured parameters (this could possibly happen via HPA in cases where that granularity/latency is sufficient). Also the running Functions should be able to shut themselves down after a configurable idle period (and/or possibly a max-messages threshold). The dispatcher would handle the scaled-to-0 case exactly as it handles the initial invocation case. We'll need to take topic partitions into account as well.
We should ignore tests that depend on k8s if it is not available
(from #267) There must be a clear contract between the function-sidecar and the *-function-invokers. As the protocol is formalized between the sidecar and invoker (whether based on HTTP, gRPC or something else) we should have confidence that the invoker is correctly invoking functions. An acceptance test suite will make it easier for us to ensure each provided invoker is functioning correctly, as well as third party invokers can ensure their compatibility.
The gateway should accept a callback request header that could then be invoked when a downstream Function has no output binding (end of a pipeline). This would enable 2 styles of request/reply clients of the gateway:
I noticed if I allow the 10 second linger to expire for a function that was deployed as a single instance, and then I immediately send an event (via /request
) so that it scales to 1 again, the reply from that request seems delayed until just a moment before the terminating function pod's status shows 0/2
. If instead I wait until that status is 0/2
or the "Terminating" function pod disappears altogether, then a new event (via /request
) is handled much more quickly.
The Operator creates and manages the CRD - see https://coreos.com/blog/introducing-operators.html
Samples are using Dockerfiles pointing to invoker base images like projectriff/node-function-invoker:0.0.1-SNAPSHOT
After a full local build of all riff components, the matching base images are included in the docker registry cache. This is not the case when deploying riff using helm .
Applies to #35.
kubectl create -f samples/greetings-topic.yaml
kubectl create -f samples/echo-function.yaml
./publish greetings hello
#should work
kubectl create -f samples/grpc-function.yaml
#both echo and grpc pods start crashing
$ kubectl create -f config/function-resource.yaml
error: unable to recognize "config/function-resource.yaml": no matches for apiextensions.k8s.io/, Kind=CustomResourceDefinition
The latest Spotify plugin simplifies the configuration and avoids using assembly.xml
Assigned: @dturanski
Currently we have two class loaders, one for the app (the one the JVM was launched on), and one for the functions. The function classloader has a null parent, so it is nicely isolated, and you can use whatever types you like inside it (they get marshalled and unmarshalled into JSON by the main app using their own classloader). But you can't pass types from functions over to the main app, in particular Flux
. We want to support users writing Function<Flux<...>, Flux<...>>
.
There are various options here, some of which might have compromises (e.g. to do with startup time, and flexibility of the programming model for users).
Option 3. is a terrible idea because it means that user code cannot use any of the classes in the app (all of Spring Boot for instance). Some users might not need that, but we restrict their options a lot.
Option 4. is also a probably terrible idea because user functions could easily break the invoker app, or vice versa, by providing features or versions of features that are incompatible. It's also really hard (maybe impossible) to support runtime function registration, which we will need for a warm start. It's really simple though.
Both of the other approaches will work, and there is no performance penalty.
Option 1. is quick and dirty and relies on a -X
option in the JVM. It is hard to test (e.g. you won't be able to test it from the IDE without setting up a custom launcher that creates the right classpath).
Option 2. is more code and it's harder to debug, but seems like less of a hack, possibly, except that when you get down to it you find that you have to do things like reflective hacks in java.net.URL
for instance.
In both approaches the user has to be coding against the same version of reactor (or at least one which is compatible with the version that the function invoker uses and installs in the class loader hierarchy).
The other feature of the current code that we might want to change is that it has a very dumb instantiation strategy. Each function class is loaded and then instantiated reflectively (default constructor). No dependency injection, no Spring Boot goodness. To change that we need another layer of code in both approaches, e.g. to create a Spring Boot application and extract the functions from it. That might also be by name, instead of by class. The function.uri
has to change its format to accommodate the additional options. Code in the invoker gets more complicated and has more branches.
I suggest we open another issue to track changes required to hook up an infinite stream (as opposed to an HTTP POST). Leave this one to be about just the class loading and possibly instantiation strategies.
We need a JUnit Kafka Rule to avoid failing tests when Kafka is not available
e.g. its ENTRYPOINT would be something like:
['while read X; do $SCRIPT $X; done</pipes/input >/pipes/output']
In the samples, function invokers are inconsistent in how FUNCTION_URI
is used. They should all use similar conventions and patterns
I managed to get the following NPE several times when using ./faas delete -n echo
:
[function-controller-6b98fcd676-htzt8] 2017-11-15 14:26:10.384 WARN 1 --- [default.svc/...] i.f.k.c.d.i.WatchConnectionManager : Exec Failure
[function-controller-6b98fcd676-htzt8]
[function-controller-6b98fcd676-htzt8] java.lang.NullPointerException: null
[function-controller-6b98fcd676-htzt8] at io.fabric8.kubernetes.client.dsl.internal.DeploymentOperationsImpl$DeploymentReaper.reap(DeploymentOperationsImpl.java:178) ~[kubernetes-client-2.5.9.jar!/:na]
[function-controller-6b98fcd676-htzt8] at io.fabric8.kubernetes.client.dsl.base.BaseOperation.delete(BaseOperation.java:609) ~[kubernetes-client-2.5.9.jar!/:na]
[function-controller-6b98fcd676-htzt8] at io.fabric8.kubernetes.client.dsl.base.BaseOperation.delete(BaseOperation.java:69) ~[kubernetes-client-2.5.9.jar!/:na]
[function-controller-6b98fcd676-htzt8] at io.sk8s.function.controller.FunctionDeployer.undeploy(FunctionDeployer.java:117) ~[classes!/:0.0.1-SNAPSHOT]
[function-controller-6b98fcd676-htzt8] at io.sk8s.function.controller.FunctionMonitor.onFunctionUnregistered(FunctionMonitor.java:101) ~[classes!/:0.0.1-SNAPSHOT]
[function-controller-6b98fcd676-htzt8] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:256) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:177) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:140) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:393) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:360) ~[spring-context-4.3.12.RELEASE.jar!/:4.3.12.RELEASE]
[function-controller-6b98fcd676-htzt8] at io.sk8s.core.resource.ResourceEventPublisher$ApplicationEventPublishingWatcher.eventReceived(ResourceEventPublisher.java:92) ~[sk8s-core-0.0.1-SNAPSHOT.jar!/:0.0.1-SNAPSHOT]
[function-controller-6b98fcd676-htzt8] at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$2.onMessage(WatchConnectionManager.java:230) ~[kubernetes-client-2.5.9.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:307) ~[okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:222) ~[okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101) ~[okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262) ~[okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201) ~[okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141) [okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.8.1.jar!/:na]
[function-controller-6b98fcd676-htzt8] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111-internal]
[function-controller-6b98fcd676-htzt8] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111-internal]
When a topic already exists (and this can happen because a publisher auto-creates topics) and the definition of a topic request more partitions than what currently exists (typically 1), then new partitions should be added.
This is possible using SCS Binder but for some reason does not happen. Need to investigate
should include dynamic loading of JAR(s) containing a Function and optional dependencies, as well as a version that compiles lambda strings dynamically
we should also consider autonomous container images, e.g. with the JAR as the top-most layer (above one of the Spring Cloud Function adapter types: REST, Task, Stream, etc). That could either be prebuilt from a CI pipeline or built on-demand at the time of function registration. We probably want to support a range of options, with documented tradeoffs
Should be able to work with more that Function<String, String>
currently we only expose "name" and "value" for the env section of a function yaml, but we should allow "valueFrom" (including support for secrets) such as:
env:
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: db
key: db-password
Assigned: @ericbottard
The controller itself should run in the cluster as a deployment so it will be managed (and so it won't require the use of kubectl proxy
as it currently does when running locally).
this will likely require changing the "topic" field of the current Function resource (e.g. to "input")
a trigger instance would emit messages to a topic
could be scheduled, subscribed to k8s events, etc
this should be a user extension point, but we could provide some implementations also
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.