krallistic / kafka-operator Goto Github PK
View Code? Open in Web Editor NEWA Kafka Operator for Kubernetes
License: Apache License 2.0
A Kafka Operator for Kubernetes
License: Apache License 2.0
Hi there,
Is this project still active ? maintained ?
What about the Confluent Operator that was promised and never delivered ? Do you guys have any link to it ?
Remove all prints and just use structured logging
Basically I run the operator, add the cluster CRD (with confluentinc/cp-kafka 3.3.0), and then the operator (0.2.0) creates the statefulset ... and crashes right after it.
time="2017-10-26T15:22:08Z" level=info msg="New CRD added, creating cluster" KafkaClusterEventType=1 clusterName=kafka event-type=1 method=processEvent
Error get sts
STS dosnt exist, creating
{{1073741824 0} {<nil>} 1Gi BinarySI}
time="2017-10-26T15:22:08Z" level=debug msg="Generated KafkaOptions from Struct to Env Vars" method=GenerateKafkaOptions options="[{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil}]" package=util
[{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil} {NAMESPACE &EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KAFKA_ZOOKEEPER_CONNECT zookeeper.development.svc.cluster.local nil} {KAFKA_HEAP_OPTS -Xmx644M nil} {KAFKA_METRIC_REPORTERS com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter nil} {KAFKA_CRUISE_CONTROL_METRICS_REPORTER_BOOTSTRAP_SERVER kafka-0.kafka.development.svc.cluster.local:9092 nil}]
&StatefulSet{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:kafka,GenerateName:,Namespace:,SelfLink:,UID:,ResourceVersion:,Generation:0,CreationTimestamp:0001-01-01 00:00:00 +0000 UTC,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},Annotations:map[string]string{},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:StatefulSetSpec{Replicas:*3,Selector:nil,Template:k8s_io_kubernetes_pkg_api_v1.PodTemplateSpec{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:,GenerateName:,Namespace:,SelfLink:,UID:,ResourceVersion:,Generation:0,CreationTimestamp:0001-01-01 00:00:00 +0000 UTC,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},Annotations:map[string]string{},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:PodSpec{Volumes:[],Containers:[{kafka confluentinc/cp-kafka:3.3.0 [/bin/bash -c export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$(hostname).kafka.$(NAMESPACE).svc.cluster.local:9092;
set -ex
[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
export KAFKA_BROKER_ID=${BASH_REMATCH[1]}
/etc/confluent/docker/run] [] [{kafka 0 9092 }] [] [{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil} {NAMESPACE EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KAFKA_ZOOKEEPER_CONNECT zookeeper.development.svc.cluster.local nil} {KAFKA_HEAP_OPTS -Xmx644M nil} {KAFKA_METRIC_REPORTERS com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter nil} {KAFKA_CRUISE_CONTROL_METRICS_REPORTER_BOOTSTRAP_SERVER kafka-0.kafka.development.svc.cluster.local:9092 nil}] {map[memory:{{1073741824 0} {<nil>} 1Gi BinarySI}] map[cpu:{{1 -1} {<nil>} DecimalSI} memory:{{644 6} {<nil>} DecimalSI}]} [] nil nil nil nil false false false}],RestartPolicy:,TerminationGracePeriodSeconds:nil,ActiveDeadlineSeconds:nil,DNSPolicy:,NodeSelector:map[string]string{},ServiceAccountName:,DeprecatedServiceAccount:,NodeName:,HostNetwork:false,HostPID:false,HostIPC:false,SecurityContext:nil,ImagePullSecrets:[],Hostname:,Subdomain:,Affinity:&Affinity{NodeAffinity:nil,PodAffinity:nil,PodAntiAffinity:&PodAntiAffinity{RequiredDuringSchedulingIgnoredDuringExecution:[],PreferredDuringSchedulingIgnoredDuringExecution:[{50 {LabelSelector{MatchLabels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},MatchExpressions:[],} [development] kubernetes.io/hostname}}],},},SchedulerName:,InitContainers:[{labeler devth/k8s-labeler [/bin/bash -c set -ex
[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
export KUBE_LABEL_kafka_broker_id=${BASH_REMATCH[1]}
/run.sh] [] [] [] [{KUBE_NAMESPACE EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KUBE_LABEL_hostname &EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.name,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KUBE_LABEL_kafka_broker_id thisshouldbeoverwritten nil}] {map[] map[]} [] nil nil nil nil false false false} {zookeeper-ready busybox [sh -c until nslookup zookeeper.development.svc.cluster.local; do echo waiting for myservice; sleep 2; done;] [] [] [] [] {map[] map[]} [] nil nil nil nil false false false}],AutomountServiceAccountToken:nil,Tolerations:[{node.alpha.kubernetes.io/unreachable Exists NoExecute 0xc4203fe2e8} {node.alpha.kubernetes.io/notReady Exists NoExecute 0xc4203fe2e8}],HostAliases:[],},},VolumeClaimTemplates:[{{ } {kafka-data 0 0001-01-01 00:00:00 +0000 UTC <nil> <nil> map[] map[volume.beta.kubernetes.io/storage-class:standard] [] nil [] } {[ReadWriteOnce] nil {map[] map[storage:{{5368709120 0} {<nil>} 5Gi BinarySI}]} <nil>} { [] map[]}}],ServiceName:kafka,PodManagementPolicy:,UpdateStrategy:StatefulSetUpdateStrategy{Type:,RollingUpdate:nil,},RevisionHistoryLimit:nil,},Status:StatefulSetStatus{ObservedGeneration:nil,Replicas:0,ReadyReplicas:0,CurrentReplicas:0,UpdatedReplicas:0,CurrentRevision:,UpdateRevision:,},}
Error while creating StatefulSet: statefulsets.apps "kafka" already exists
time="2017-10-26T15:22:09Z" level=fatal msg="Cant create statefulset" clusterName=kafka error="statefulsets.apps \"kafka\" already exists" method=CreateKafkaCluster
Image: "devth/k8s-labeler" is referenced in kafka.go which doesn't appear to be parameterized and isn't able to be referenced with with a private repository.
Decide how to handle & trigger rebalancing:
I know you're aware of the project, but has anyone successfully combined your operator (with Cruise Control, yay!) with the Yolean/kubernetes-kafka base? There's a bit of a disconnect, but I would ideally like the benefits of both.
There is some desire over there in Yolean/kubernetes-kafka#100
Currently there is a quite some ugly bash code hacking in the kafka image to set advertised listener and broker id correctly.
Command: []string{"/bin/bash",
"-c",
fmt.Sprintf("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$(hostname).%s.$(NAMESPACE).svc.cluster.local:9092; \n"+
"set -ex\n"+
"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1\n"+
"export KAFKA_BROKER_ID=${BASH_REMATCH[1]}\n"+
"/etc/confluent/docker/run", name),
This could be solved better?
Hey, I came across this whilst browsing for related things on GitHub, looks super cool - nice work.
I'd like to give it a try so I'm wondering A) whats the current state in terms of using in a non-prod environment initially, I note its a WIP and following on from that B) would you like any assistance testing etc.
It should be possible to add an ImagePullSecret and the possibility to pull from different repos.
https://github.com/krallistic/kafka-operator/blob/master/controller/crd.go#L97 specifically checks all namespaces for CRD instances. In my case I rather have the operator only look-at/touch a single namespace.
Currently, some state is stored inside the pod annotation. This isn't really good practice and should the state filed inside the crd should be used.
To make better usage of k8s scheduling concepts we should look build in anti affinity to the zookeeper nodes and potential other kafka clusters.
Also take a look if it makes sense to incorporate kafka rack aware stuff into k8s.
Create single SVC for every Broker to allow direct access for advanced usecases (not through LB headless Service)
As a first deeper integration with Kafka it should be possible to read various information from the kafka api and expose them on the prometheus endpoint
when exec 'kubectl apply -f kafka-cluster.yaml', get the err as follow:
error: unable to recognize "kafka-cluster.yaml": no matches for extensions/, Kind=Kafkacluster
some info:
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.4"
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3"
Wrap Error with pgk/Errors with more context/information.
Also take a second look at handling, currently pretty messy.
If you currently create a cluster, then delete it, the StatefulSet gets scaled down & deleted, but services wont get deleted.
Are there any plan to expose metrics for the cluster state and the consumer lag as Prometheus metrics by default?
Are you open for PRs for this features?
panic: runtime error: index out of range
goroutine 42 [running]:
github.com/krallistic/kafka-operator/util.(*ClientUtil).BrokerStSImageUpdate(0xc4204ac4f8, 0xc42041401c, 0x2, 0xc420414016, 0x6, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
DetectChangeType: {ERROR {v1 Status { 0 0001-01-01 00:00:00 +0000 UTC <nil> <nil> map[] map[] [] [] } { 0 { } {0 false } false [] map[] 0 0 0}}}
/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/util/util.go:371 +0x1f7
TODO error?
github.com/krallistic/kafka-operator/processor.(*Processor).DetectChangeType(0xc4204ac480, 0xc420414010, 0x5, 0xc42041401c, 0x2, 0xc420414016, 0x6, 0x0, 0x0, 0x0, ...)
/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:67 +0x283
github.com/krallistic/kafka-operator/processor.(*Processor).WatchKafkaEvents.func1(0xc4204ac480)
/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:178 +0x1c0
created by github.com/krallistic/kafka-operator/processor.(*Processor).WatchKafkaEvents
/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:189 +0xe2
exit status 2
Currently a kubeproxy on 8080 is needed for the TPR Watch.
This could be removed if we used the (the already existing) masterHostFlag + get token out of kubeconfig/servicaccount mount..
Currently the operator uses a mix of different dependencies of the k8s API/client. Should be fixed to one consistent usage.
It should be possible to create KafkaCluster TPR in every NS.
After we create the TPR, we go straight to the Informer without ensuring the TPR fully exist. This can throw some unnecessary errors:
time="2017-05-12T14:08:23+02:00" level=info msg="&{0xc420326180 0x1d46180}" method=Watch package="controller/tpr"
E0512 14:08:23.640792 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:24.643334 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:25.648408 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:26.652970 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:27.659649 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:28.661724 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:29.663870 1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
The tprController should wait till the TPR is fully created.
No more manual http rest calls..
Since Kafka is highly sensitive to Network IO/Bandwidth a high priority would be to ensure it has the required resources.
Currently is possible to set bandwidth limitations on a pod level with:
kubernetes.io/{ingress,egress}-bandwidth
but these annotations are not resources and therefor not used by the scheduler to ensure capacity on a node.
Kubernetes Issues to watch:
Depends on #11
Health check can be just when every component started (maybe use promhttp for that?)
Readiness should be when CRD has been established and informer works.
There are no vendor directory under the project of the latest tagged version (0.2.0).How to use dep to generate the vendor directory?I see only the Gopkg.toml file has no Gopkg.lock file.I want to generate the dependency package in the vendor directory.If possible, please provide a complete dependency package and dependency description file.
Hi,
As is, the operator can't create CRDs if not given some permissions.
Applying this makes it work (assuming you deploy on the default namespace) :
Maybe some extra documentation on the README.md file to tell permissions are needed would be nice ;-)
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: operator-kafka
rules:
- apiGroups:
- krallistic.github.com
resources:
- "*"
verbs:
- "*"
- apiGroups:
- ""
resources:
- pods
- services
- endpoints
- persistentvolumeclaims
- events
- configmaps
- secrets
verbs:
- "*"
- apiGroups:
- apps
resources:
- deployments
- daemonsets
- replicasets
- statefulsets
verbs:
- "*"
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- '*'
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: default-account-operator-kafka
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: ClusterRole
name: operator-kafka
apiGroup: rbac.authorization.k8s.io
We have a test environment on GKE v1.7.8. Kafka server pods keep failing with exception:
org.apache.kafka.common.KafkaException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter ClassNotFoundException exception occurred
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:288)
at kafka.server.KafkaServer.startup(KafkaServer.scala:207)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
Caused by: java.lang.ClassNotFoundException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:308)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:286)
... 3 more
This also leads to the crash of cruise-control and kafka-offset deployments.
Linkedin opensourced cruise-control (https://github.com/linkedin/cruise-control) a tool to rebalance kafka-cluster. Since they have much more experience running kafka cluster, their algorithms should be used.
As a first integration, the following steps has to be done
Automatic Rebalancing of topics (if a skew occurs) is out of scope for the first integration
The example does not run up to [How to use it] in readme.I met the following problem.Which is the specific version of docker image confluentinc/cp-kafka?
NAME READY STATUS RESTARTS AGE
busybox 1/1 Running 40 1d
cruise-control-test-cluster-1-d5c7db4d4-7w7zl 0/1 CrashLoopBackOff 142 10h
glusterblock-provisioner-565889669f-f9qkn 1/1 Running 1 6d
glusterfile-provisioner-854d6575f-lg27c 1/1 Running 2 6d
glusterfs-6z6kd 1/1 Running 2 11d
glusterfs-simple-provisioner-7766dd59fb-lt66m 1/1 Running 2 7d
glusterfs-t5x9q 1/1 Running 2 11d
kafka-offset-checker-test-cluster-1-654658954-ts49g 0/1 CrashLoopBackOff 132 10h
kafka-operator-54c96598c5-52gbp 1/1 Running 0 10h
test-cluster-1-0 0/1 CrashLoopBackOff 3 2m
test-cluster-1-1 0/1 CrashLoopBackOff 3 2m
zk-0 1/1 Running 0 10h
...
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter ClassNotFoundException exception occurred
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
at kafka.server.DynamicMetricsReporters.createReporters(DynamicBrokerConfig.scala:652)
at kafka.server.DynamicMetricsReporters.(DynamicBrokerConfig.scala:600)
at kafka.server.DynamicBrokerConfig.addReconfigurables(DynamicBrokerConfig.scala:161)
at kafka.server.KafkaServer.startup(KafkaServer.scala:301)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:117)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:62)
Caused by: java.lang.ClassNotFoundException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:322)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:311)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
... 6 more
...
Support More Options passed down to Kafka itself.
The Spec already contains resources like CPU/RAM (&Disk). While these are somewhat used for the Pod, these also should be pushed down to the JVM.
Use Toleration to make Broker sticky for small node crashes.
Doesn't solve full data gravity, but first step
The value for the storageClass
spec option is completely ignored, and instead the operator produces PVCs with a 'standard' storage class.
kind: Kafkacluster
apiVersion: krallistic.github.com/v1
metadata:
name: kafka
spec:
brokerCount: 3
zookeeperConnect: zookeeper.default.svc.cluster.local
storageClass: slow
I'm suspecting this patch will fix that, but I cannot even get dep ensure
to work, and removing the constraints leads to build failures.
diff --git a/util/util.go b/util/util.go
--- a/util/util.go
+++ b/util/util.go
@@ -391,7 +391,7 @@ func (c *ClientUtil) createStsFromSpec(cluster spec.Kafkacluster) *appsv1Beta1.S
replicas := cluster.Spec.BrokerCount
image := cluster.Spec.Image
- storageClass := "standard"
+ storageClass := cluster.Spec.StorageClass
//TODO error handling, default value?
cpus, err := resource.ParseQuantity(cluster.Spec.Resources.CPU)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.