This repository contains a sample streaming function that can be used with the riff streaming POC. It is simple enough to be usable as both a request/reply sample and a stateful streaming sample.
These samples assume you have the latest patched riff system,
riff CLI reset branch,
liiklus running and exposed as a k8s service,
as well as a configured kafka provider
and have modified the function builder
to use ericbottard/builder:streaming-grpc
.
riff function create time-averager \
--git-repo https://github.com/projectriff-samples/time-averager.git \
--handler com.acme.TimeAverager \
--image $KO_DOCKER_REPO/time-averager
Create a handler for the function:
riff handler create time-averager --function-ref time-averager
Invoke it a couple of times. The average of a single number is the number itself:
riff handler invoke time-averager --text -- -d 48 -H 'Accept: application/json' -w '\n'
48.0
riff handler invoke time-averager --text -- -d 50 -H 'Accept: application/json' -w '\n'
50.0
Create two streams
riff stream create --provider kafka-provider numbers
riff stream create --provider kafka-provider avgs
Create a streaming processor for the function:
riff processor create time-averager --function-ref time-averager --input numbers --output avgs
Using the riff liiklus-client, in two separate terminals:
Setup a consumer:
java -jar target/liiklus-client-1.0.0-SNAPSHOT.jar --consumer my.liiklus.host:6565 avgs
Setup a producer:
java -jar target/liiklus-client-1.0.0-SNAPSHOT.jar --producer my.liiklus.host:6565 numbers
Then, in the producer window, and in rather quick succession, enter numbers and hit [ENTER].