Assumes you have used try.openshift.com to create an OCP 4.2 cluster. Here is a quick video that illustrates that process.
CLI tools used:
-
kubectl
-
oc
-
jq
-
kafkacat
-
siege
-
watch
Using the OCP 4.x Administration Console - find OperatorHub and install
-
Knative Serving
-
Knative Eventing
-
Knative Kafka
-
Strimzi
You can check on your installed operators and their versions:
kubectl get csv NAME DISPLAY VERSION REPLACES PHASE knative-eventing-operator.v0.7.1 Knative Eventing Operator 0.7.1 knative-eventing-operator.v0.6.0 Succeeded knative-kafka-operator.v0.7.1 Knative Apache Kafka Operator 0.7.1 knative-kafka-operator.v0.6.0 Succeeded knative-serving-operator.v0.7.1 Knative Serving Operator 0.7.1 knative-serving-operator.v0.6.0 Succeeded strimzi-cluster-operator.v0.13.0 Strimzi Apache Kafka Operator 0.13.0 strimzi-cluster-operator.v0.12.2 Succeeded
Note
|
I have also used the following versions - OpenShift Serverless pulls in ElasticSearch, Jaeger, Kiali |
kubectl get csv NAME DISPLAY VERSION REPLACES PHASE elasticsearch-operator.4.3.1-202002032140 Elasticsearch Operator 4.3.1-202002032140 Succeeded jaeger-operator.v1.13.1 Jaeger Operator 1.13.1 Succeeded kiali-operator.v1.0.9 Kiali Operator 1.0.9 kiali-operator.v1.0.8 Succeeded knative-eventing-operator.v0.12.0 Knative Eventing Operator 0.12.0 knative-eventing-operator.v0.11.0 Succeeded knative-kafka-operator.v0.12.1 Knative Apache Kafka Operator 0.12.1 knative-kafka-operator.v0.11.2 Succeeded serverless-operator.v1.4.1 OpenShift Serverless Operator 1.4.1 serverless-operator.v1.4.0 Succeeded servicemeshoperator.v1.0.7 Red Hat OpenShift Service Mesh 1.0.7 servicemeshoperator.v1.0.6 Succeeded strimzi-cluster-operator.v0.15.0 Strimzi 0.15.0 strimzi-cluster-operator.v0.14.0 Succeeded
kubectl create namespace demo
# make it "sticky"
kubectl config set-context --current --namespace=demo
# check that it is set
kubectl config current-context
# or use "oc" to see what the "sticky" namespace is
oc project
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
listeners:
plain: {}
tls: {}
external:
type: loadbalancer
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: true
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: true
entityOperator:
topicOperator: {}
userOperator: {}
EOF
Note: this only needs to be done one time
cat <<EOF | kubectl apply -n knative-eventing -f -
apiVersion: eventing.knative.dev/v1alpha1
kind: KnativeEventingKafka
metadata:
name: knative-eventing-kafka
namespace: knative-eventing
spec:
bootstrapServers: 'my-cluster-kafka-bootstrap.demo:9092'
setAsDefaultChannelProvisioner: false
EOF
Verify the KnativeEventingKafka took affect
kubectl get crds | grep kafkasource
kafkasources.sources.eventing.knative.dev 2019-09-21T14:23:14Z
and
kubectl get pods -n knative-eventing
NAME READY STATUS RESTARTS AGE
eventing-controller-758d785bf7-wzq7v 1/1 Running 0 18m
eventing-webhook-7ff46cd45f-5tz9z 1/1 Running 0 18m
imc-controller-75d7f598df-b48bc 1/1 Running 0 17m
imc-dispatcher-77f565585c-xb5c8 1/1 Running 0 17m
in-memory-channel-controller-6b4967d97b-mlrdk 1/1 Running 0 18m
in-memory-channel-dispatcher-8bbcd4f9-t2gbj 1/1 Running 0 17m
kafka-ch-controller-5f55f4c58-9dm5j 0/1 ContainerCreating 0 11s
kafka-ch-dispatcher-5655cc4c9f-xbhv7 0/1 ContainerCreating 0 10s
kafka-channel-controller-578d46d7bd-fz6nf 0/1 ContainerCreating 0 19s
kafka-channel-dispatcher-b49d4bc54-dhbp9 0/1 ContainerCreating 0 17s
kafka-controller-manager-0 0/1 ContainerCreating 0 20s
kafka-webhook-7c96f59b7f-9hzd5 0/1 ContainerCreating 0 10s
sources-controller-788874d5fc-vl5mb 1/1 Running 0 18m
cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
name: my-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 100
replicas: 1
EOF
Test to see if the topic was created correctly
oc exec -it -n demo -c kafka my-cluster-kafka-0 /bin/bash
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
OR
kubectl exec -it -c kafka my-cluster-kafka-0 -- bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:my-topic PartitionCount:100 ReplicationFactor:1 Configs:message.format.version=2.3-IV1
Topic: my-topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: my-topic Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: my-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 6 Leader: 2 Replicas: 2 Isr: 2
.
.
.
# AWS
export BOOTSTRAP_IP=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[].hostname)
#Azure
export BOOTSTRAP_IP=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[].ip)
export BOOTSTRAP_PORT=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .spec.ports[].port)
export BOOTSTRAP_URL=$BOOTSTRAP_IP:$BOOTSTRAP_PORT
Then use Kafkacat to produce/consume messages
kafkacat -P -b $BOOTSTRAP_URL -t my-topic
one
two
three
ctrl-z to end
kafkacat -C -b $BOOTSTRAP_URL -t my-topic
one
% Reached end of topic my-topic [35] at offset 1
two
% Reached end of topic my-topic [81] at offset 1
three
% Reached end of topic my-topic [32] at offset 1
ctrl-c to end
This is your "sink" that receives events
cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: myknativesink
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/target: "1"
spec:
containers:
- image: docker.io/burrsutter/myknativesink:1.0.1
resources:
requests:
memory: "50Mi"
cpu: "100m"
limits:
memory: "50Mi"
cpu: "100m"
livenessProbe:
httpGet:
path: /healthz
readinessProbe:
httpGet:
path: /healthz
EOF
If your pod is stuck in PENDING, check your events
kubectl get events --sort-by=.metadata.creationTimestamp
You likely need to add another worker node (OpenShift Console - Compute - MachineSets)
cat <<EOF | kubectl apply -f -
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
name: mykafka-source
spec:
consumerGroup: knative-group
bootstrapServers: 52.185.212.83:9094 # (1)
topics: my-topic
sink:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: myknativesink
EOF
-
"bootstrapServers: 52.185.212.83:9094" comes from
# AWS kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[0].hostname #OR #Azure kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[0].ip # and kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .spec.ports[].port
You can monitor the logs of mmyknativesink-source to see if it has connectivity issues
stern mykafka-source
Now push some messages in, must be in JSON format
kafkacat -P -b $BOOTSTRAP_URL -t my-topic {"hello":"world"}
and you should see some logging output
kubectl logs -l serving.knative.dev/configuration=myknativesink -c user-container
# or
kail -l serving.knative.dev/configuration=myknativesink -c user-container
# or
stern myknativesink
myknativesink-h6l7x-deployment-54d58c84c5-q9sm5 user-container EVENT: {"hello":"world"}
Kafka-Producer is a simple little application that drives in 1, 10 or 100 messages as fast as it can.
Deploy kafka-producer
cd kafka-producer # update the Deployment.yml with the correct IP/Port $BOOTSTRAP_URL kubectl apply -f Deployment.yml kubectl apply -f Service.yml oc expose service kafka-producer
Then drive some load
PRODUCER_URL="$(kubectl get route kafka-producer -ojson | jq -r .status.ingress[].host)" curl $PRODUCER_URL/1
Watch the Developer Topology view