Giter VIP home page Giter VIP logo

knative-kafka's Introduction

Knative + Kafka

Prerequisites

Assumes you have used try.openshift.com to create an OCP 4.2 cluster. Here is a quick video that illustrates that process.

CLI tools used:

  • kubectl

  • oc

  • jq

  • kafkacat

  • siege

  • watch

Installation

Using the OCP 4.x Administration Console - find OperatorHub and install

  1. Knative Serving

  2. Knative Eventing

  3. Knative Kafka

  4. Strimzi

OperatorHub inside of OpenShift Console
Installed Operators

You can check on your installed operators and their versions:

kubectl get csv
NAME                               DISPLAY                         VERSION   REPLACES                           PHASE
knative-eventing-operator.v0.7.1   Knative Eventing Operator       0.7.1     knative-eventing-operator.v0.6.0   Succeeded
knative-kafka-operator.v0.7.1      Knative Apache Kafka Operator   0.7.1     knative-kafka-operator.v0.6.0      Succeeded
knative-serving-operator.v0.7.1    Knative Serving Operator        0.7.1     knative-serving-operator.v0.6.0    Succeeded
strimzi-cluster-operator.v0.13.0   Strimzi Apache Kafka Operator   0.13.0    strimzi-cluster-operator.v0.12.2   Succeeded
Note
I have also used the following versions - OpenShift Serverless pulls in ElasticSearch, Jaeger, Kiali
kubectl get csv
NAME                                        DISPLAY                          VERSION              REPLACES                            PHASE
elasticsearch-operator.4.3.1-202002032140   Elasticsearch Operator           4.3.1-202002032140                                       Succeeded
jaeger-operator.v1.13.1                     Jaeger Operator                  1.13.1                                                   Succeeded
kiali-operator.v1.0.9                       Kiali Operator                   1.0.9                kiali-operator.v1.0.8               Succeeded
knative-eventing-operator.v0.12.0           Knative Eventing Operator        0.12.0               knative-eventing-operator.v0.11.0   Succeeded
knative-kafka-operator.v0.12.1              Knative Apache Kafka Operator    0.12.1               knative-kafka-operator.v0.11.2      Succeeded
serverless-operator.v1.4.1                  OpenShift Serverless Operator    1.4.1                serverless-operator.v1.4.0          Succeeded
servicemeshoperator.v1.0.7                  Red Hat OpenShift Service Mesh   1.0.7                servicemeshoperator.v1.0.6          Succeeded
strimzi-cluster-operator.v0.15.0            Strimzi                          0.15.0               strimzi-cluster-operator.v0.14.0    Succeeded

Namespace/Project Setup

kubectl create namespace demo

# make it "sticky"
kubectl config set-context --current --namespace=demo

# check that it is set
kubectl config current-context

# or use "oc" to see what the "sticky" namespace is
oc project

Create kafka cluster

cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3
    listeners:
      plain: {}
      tls: {}
      external:
        type: loadbalancer
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: true
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}
EOF

Configure the Knative Eventing Kafka

Note: this only needs to be done one time

cat <<EOF | kubectl apply -n knative-eventing -f -
apiVersion: eventing.knative.dev/v1alpha1
kind: KnativeEventingKafka
metadata:
  name: knative-eventing-kafka
  namespace: knative-eventing
spec:
  bootstrapServers: 'my-cluster-kafka-bootstrap.demo:9092'
  setAsDefaultChannelProvisioner: false
EOF

Verify the KnativeEventingKafka took affect

kubectl get crds | grep kafkasource
kafkasources.sources.eventing.knative.dev                   2019-09-21T14:23:14Z

and

kubectl get pods -n knative-eventing

NAME                                            READY   STATUS              RESTARTS   AGE
eventing-controller-758d785bf7-wzq7v            1/1     Running             0          18m
eventing-webhook-7ff46cd45f-5tz9z               1/1     Running             0          18m
imc-controller-75d7f598df-b48bc                 1/1     Running             0          17m
imc-dispatcher-77f565585c-xb5c8                 1/1     Running             0          17m
in-memory-channel-controller-6b4967d97b-mlrdk   1/1     Running             0          18m
in-memory-channel-dispatcher-8bbcd4f9-t2gbj     1/1     Running             0          17m
kafka-ch-controller-5f55f4c58-9dm5j             0/1     ContainerCreating   0          11s
kafka-ch-dispatcher-5655cc4c9f-xbhv7            0/1     ContainerCreating   0          10s
kafka-channel-controller-578d46d7bd-fz6nf       0/1     ContainerCreating   0          19s
kafka-channel-dispatcher-b49d4bc54-dhbp9        0/1     ContainerCreating   0          17s
kafka-controller-manager-0                      0/1     ContainerCreating   0          20s
kafka-webhook-7c96f59b7f-9hzd5                  0/1     ContainerCreating   0          10s
sources-controller-788874d5fc-vl5mb             1/1     Running             0          18m

Create kafka topic

cat <<EOF | kubectl apply -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 100
  replicas: 1
EOF

Test to see if the topic was created correctly

oc exec -it -n demo -c kafka my-cluster-kafka-0 /bin/bash

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic

OR

kubectl exec -it -c kafka my-cluster-kafka-0 -- bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:my-topic	PartitionCount:100	ReplicationFactor:1	Configs:message.format.version=2.3-IV1
	Topic: my-topic	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: my-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 3	Leader: 2	Replicas: 2	Isr: 2
	Topic: my-topic	Partition: 4	Leader: 0	Replicas: 0	Isr: 0
	Topic: my-topic	Partition: 5	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 6	Leader: 2	Replicas: 2	Isr: 2
.
.
.

Test connectivity to the kafka my-topic

# AWS
export BOOTSTRAP_IP=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[].hostname)

#Azure
export BOOTSTRAP_IP=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[].ip)

export BOOTSTRAP_PORT=$(kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .spec.ports[].port)

export BOOTSTRAP_URL=$BOOTSTRAP_IP:$BOOTSTRAP_PORT

Then use Kafkacat to produce/consume messages

kafkacat -P -b $BOOTSTRAP_URL -t my-topic
one
two
three

ctrl-z to end

kafkacat -C -b $BOOTSTRAP_URL -t my-topic
one
% Reached end of topic my-topic [35] at offset 1
two
% Reached end of topic my-topic [81] at offset 1
three
% Reached end of topic my-topic [32] at offset 1

ctrl-c to end

Deploy a Knative Service

This is your "sink" that receives events

cat <<EOF | kubectl apply -f -
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: myknativesink
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "1"
    spec:
      containers:
      - image: docker.io/burrsutter/myknativesink:1.0.1
        resources:
          requests:
            memory: "50Mi"
            cpu: "100m"
          limits:
            memory: "50Mi"
            cpu: "100m"
        livenessProbe:
          httpGet:
            path: /healthz
        readinessProbe:
          httpGet:
            path: /healthz
EOF

If your pod is stuck in PENDING, check your events

kubectl get events --sort-by=.metadata.creationTimestamp

You likely need to add another worker node (OpenShift Console - Compute - MachineSets)

Machinesets

Create the KafkaSource that connects my-topic to ksvc

cat <<EOF | kubectl apply -f -
apiVersion: sources.eventing.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: mykafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers: 52.185.212.83:9094 # (1)
  topics: my-topic
  sink:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: myknativesink
EOF
  1. "bootstrapServers: 52.185.212.83:9094" comes from

# AWS
kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[0].hostname
#OR
#Azure
kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .status.loadBalancer.ingress[0].ip

# and
kubectl get services my-cluster-kafka-external-bootstrap -ojson | jq -r .spec.ports[].port

You can monitor the logs of mmyknativesink-source to see if it has connectivity issues

stern mykafka-source

Test

Now push some messages in, must be in JSON format

kafkacat -P -b $BOOTSTRAP_URL -t my-topic
{"hello":"world"}

and you should see some logging output

kubectl logs -l serving.knative.dev/configuration=myknativesink -c user-container
# or
kail -l serving.knative.dev/configuration=myknativesink -c user-container
# or
stern myknativesink
myknativesink-h6l7x-deployment-54d58c84c5-q9sm5 user-container EVENT: {"hello":"world"}
Waiting
Sink pod is up
one more message

Scaling beyond 1 Pod

Kafka-Producer is a simple little application that drives in 1, 10 or 100 messages as fast as it can.

Deploy kafka-producer

cd kafka-producer
# update the Deployment.yml with the correct IP/Port $BOOTSTRAP_URL
kubectl apply -f Deployment.yml
kubectl apply -f Service.yml
oc expose service kafka-producer

Then drive some load

PRODUCER_URL="$(kubectl get route kafka-producer -ojson | jq -r .status.ingress[].host)"
curl $PRODUCER_URL/1

Watch the Developer Topology view

Developer View
Developer View
Terminal View

Clean up

kubectl delete route kafka-producer
kubectl delete service kafka-producer
kubectl delete deployment kafka-producer
kubectl delete kafkasource mykafka-source
kubectl delete ksvc myknativesink
kubectl delete KafkaTopic my-topic
kubectl delete kafka my-cluster

knative-kafka's People

Contributors

burrsutter avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.