Giter VIP home page Giter VIP logo

kafka-operator's People

Contributors

krallistic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-operator's Issues

project still active ?

Hi there,

Is this project still active ? maintained ?
What about the Confluent Operator that was promised and never delivered ? Do you guys have any link to it ?

operator crashes when the statefulset exists

Basically I run the operator, add the cluster CRD (with confluentinc/cp-kafka 3.3.0), and then the operator (0.2.0) creates the statefulset ... and crashes right after it.

time="2017-10-26T15:22:08Z" level=info msg="New CRD added, creating cluster" KafkaClusterEventType=1 clusterName=kafka event-type=1 method=processEvent 
Error get sts
STS dosnt exist, creating
{{1073741824 0} {<nil>} 1Gi BinarySI}
time="2017-10-26T15:22:08Z" level=debug msg="Generated KafkaOptions from Struct to Env Vars" method=GenerateKafkaOptions options="[{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil}]" package=util 
[{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil} {NAMESPACE  &EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KAFKA_ZOOKEEPER_CONNECT zookeeper.development.svc.cluster.local nil} {KAFKA_HEAP_OPTS -Xmx644M nil} {KAFKA_METRIC_REPORTERS com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter nil} {KAFKA_CRUISE_CONTROL_METRICS_REPORTER_BOOTSTRAP_SERVER kafka-0.kafka.development.svc.cluster.local:9092 nil}]
&StatefulSet{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:kafka,GenerateName:,Namespace:,SelfLink:,UID:,ResourceVersion:,Generation:0,CreationTimestamp:0001-01-01 00:00:00 +0000 UTC,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},Annotations:map[string]string{},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:StatefulSetSpec{Replicas:*3,Selector:nil,Template:k8s_io_kubernetes_pkg_api_v1.PodTemplateSpec{ObjectMeta:k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{Name:,GenerateName:,Namespace:,SelfLink:,UID:,ResourceVersion:,Generation:0,CreationTimestamp:0001-01-01 00:00:00 +0000 UTC,DeletionTimestamp:<nil>,DeletionGracePeriodSeconds:nil,Labels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},Annotations:map[string]string{},OwnerReferences:[],Finalizers:[],ClusterName:,Initializers:nil,},Spec:PodSpec{Volumes:[],Containers:[{kafka confluentinc/cp-kafka:3.3.0 [/bin/bash -c export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$(hostname).kafka.$(NAMESPACE).svc.cluster.local:9092; 
set -ex
[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
export KAFKA_BROKER_ID=${BASH_REMATCH[1]}
/etc/confluent/docker/run] []  [{kafka 0 9092  }] [] [{COMPRESSION_TYPE gzip nil} {LOG_RETENTION_BYTES -1 nil} {LOG_RETENTION_HOURS -1 nil} {LOG_ROLL_HOURS -1 nil} {NAMESPACE  EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KAFKA_ZOOKEEPER_CONNECT zookeeper.development.svc.cluster.local nil} {KAFKA_HEAP_OPTS -Xmx644M nil} {KAFKA_METRIC_REPORTERS com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter nil} {KAFKA_CRUISE_CONTROL_METRICS_REPORTER_BOOTSTRAP_SERVER kafka-0.kafka.development.svc.cluster.local:9092 nil}] {map[memory:{{1073741824 0} {<nil>} 1Gi BinarySI}] map[cpu:{{1 -1} {<nil>}  DecimalSI} memory:{{644 6} {<nil>}  DecimalSI}]} [] nil nil nil    nil false false false}],RestartPolicy:,TerminationGracePeriodSeconds:nil,ActiveDeadlineSeconds:nil,DNSPolicy:,NodeSelector:map[string]string{},ServiceAccountName:,DeprecatedServiceAccount:,NodeName:,HostNetwork:false,HostPID:false,HostIPC:false,SecurityContext:nil,ImagePullSecrets:[],Hostname:,Subdomain:,Affinity:&Affinity{NodeAffinity:nil,PodAffinity:nil,PodAntiAffinity:&PodAntiAffinity{RequiredDuringSchedulingIgnoredDuringExecution:[],PreferredDuringSchedulingIgnoredDuringExecution:[{50 {LabelSelector{MatchLabels:map[string]string{component: kafka,creator: kafka-operator,name: kafka,role: data,},MatchExpressions:[],} [development] kubernetes.io/hostname}}],},},SchedulerName:,InitContainers:[{labeler devth/k8s-labeler [/bin/bash -c set -ex
[[ `hostname` =~ -([0-9]+)$ ]] || exit 1
export KUBE_LABEL_kafka_broker_id=${BASH_REMATCH[1]}
/run.sh] []  [] [] [{KUBE_NAMESPACE  EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.namespace,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KUBE_LABEL_hostname  &EnvVarSource{FieldRef:&ObjectFieldSelector{APIVersion:,FieldPath:metadata.name,},ResourceFieldRef:nil,ConfigMapKeyRef:nil,SecretKeyRef:nil,}} {KUBE_LABEL_kafka_broker_id thisshouldbeoverwritten nil}] {map[] map[]} [] nil nil nil    nil false false false} {zookeeper-ready busybox [sh -c until nslookup zookeeper.development.svc.cluster.local; do echo waiting for myservice; sleep 2; done;] []  [] [] [] {map[] map[]} [] nil nil nil    nil false false false}],AutomountServiceAccountToken:nil,Tolerations:[{node.alpha.kubernetes.io/unreachable Exists  NoExecute 0xc4203fe2e8} {node.alpha.kubernetes.io/notReady Exists  NoExecute 0xc4203fe2e8}],HostAliases:[],},},VolumeClaimTemplates:[{{ } {kafka-data      0 0001-01-01 00:00:00 +0000 UTC <nil> <nil> map[] map[volume.beta.kubernetes.io/storage-class:standard] [] nil [] } {[ReadWriteOnce] nil {map[] map[storage:{{5368709120 0} {<nil>} 5Gi BinarySI}]}  <nil>} { [] map[]}}],ServiceName:kafka,PodManagementPolicy:,UpdateStrategy:StatefulSetUpdateStrategy{Type:,RollingUpdate:nil,},RevisionHistoryLimit:nil,},Status:StatefulSetStatus{ObservedGeneration:nil,Replicas:0,ReadyReplicas:0,CurrentReplicas:0,UpdatedReplicas:0,CurrentRevision:,UpdateRevision:,},}
Error while creating StatefulSet:  statefulsets.apps "kafka" already exists
time="2017-10-26T15:22:09Z" level=fatal msg="Cant create statefulset" clusterName=kafka error="statefulsets.apps \"kafka\" already exists" method=CreateKafkaCluster 

Rebalancing

Decide how to handle & trigger rebalancing:

  1. Do automatic rebalancing
  2. Manual rebalancing triggered over a REST interface
  3. Manual rebalancing triggered over TPR resource

Get rid of bash hacking in kafka image

Currently there is a quite some ugly bash code hacking in the kafka image to set advertised listener and broker id correctly.

Command: []string{"/bin/bash",
"-c",
fmt.Sprintf("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$(hostname).%s.$(NAMESPACE).svc.cluster.local:9092; \n"+
"set -ex\n"+
"[[ `hostname` =~ -([0-9]+)$ ]] || exit 1\n"+
"export KAFKA_BROKER_ID=${BASH_REMATCH[1]}\n"+
"/etc/confluent/docker/run", name),

This could be solved better?

Testing

Hey, I came across this whilst browsing for related things on GitHub, looks super cool - nice work.

I'd like to give it a try so I'm wondering A) whats the current state in terms of using in a non-prod environment initially, I note its a WIP and following on from that B) would you like any assistance testing etc.

Advanced Placement (Rack/Zone Aware / zK AntiAffinity

To make better usage of k8s scheduling concepts we should look build in anti affinity to the zookeeper nodes and potential other kafka clusters.

Also take a look if it makes sense to incorporate kafka rack aware stuff into k8s.

Direct Broker Access

Create single SVC for every Broker to allow direct access for advanced usecases (not through LB headless Service)

Expose Kafka Stats on Prometheus

As a first deeper integration with Kafka it should be possible to read various information from the kafka api and expose them on the prometheus endpoint

follow the readme can not working

when exec 'kubectl apply -f kafka-cluster.yaml', get the err as follow:
error: unable to recognize "kafka-cluster.yaml": no matches for extensions/, Kind=Kafkacluster

some info:
Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.4"
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3"

Cleanup Services after delete

If you currently create a cluster, then delete it, the StatefulSet gets scaled down & deleted, but services wont get deleted.

Default Prometheus integration

Are there any plan to expose metrics for the cluster state and the consumer lag as Prometheus metrics by default?

Are you open for PRs for this features?

Debug & Fix random crashes

panic: runtime error: index out of range

goroutine 42 [running]:
github.com/krallistic/kafka-operator/util.(*ClientUtil).BrokerStSImageUpdate(0xc4204ac4f8, 0xc42041401c, 0x2, 0xc420414016, 0x6, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
DetectChangeType:  {ERROR {v1 Status {      0 0001-01-01 00:00:00 +0000 UTC <nil> <nil> map[] map[] [] [] } { 0 {  } {0 false } false []  map[]  0 0 0}}}
	/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/util/util.go:371 +0x1f7
TODO error?
github.com/krallistic/kafka-operator/processor.(*Processor).DetectChangeType(0xc4204ac480, 0xc420414010, 0x5, 0xc42041401c, 0x2, 0xc420414016, 0x6, 0x0, 0x0, 0x0, ...)
	/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:67 +0x283
github.com/krallistic/kafka-operator/processor.(*Processor).WatchKafkaEvents.func1(0xc4204ac480)
	/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:178 +0x1c0
created by github.com/krallistic/kafka-operator/processor.(*Processor).WatchKafkaEvents
	/Users/jakobkaralus/workspace/go/src/github.com/krallistic/kafka-operator/processor/processor.go:189 +0xe2
exit status 2

Remove Need for kube proxy 8080

Currently a kubeproxy on 8080 is needed for the TPR Watch.
This could be removed if we used the (the already existing) masterHostFlag + get token out of kubeconfig/servicaccount mount..

Cleanup dependencies

Currently the operator uses a mix of different dependencies of the k8s API/client. Should be fixed to one consistent usage.

Wait till TPR is created

After we create the TPR, we go straight to the Informer without ensuring the TPR fully exist. This can throw some unnecessary errors:

time="2017-05-12T14:08:23+02:00" level=info msg="&{0xc420326180 0x1d46180}" method=Watch package="controller/tpr" 
E0512 14:08:23.640792    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:24.643334    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:25.648408    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:26.652970    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:27.659649    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:28.661724    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource
E0512 14:08:29.663870    1167 reflector.go:201] github.com/krallistic/kafka-operator/controller/tpr.go:142: Failed to list *spec.KafkaCluster: the server could not find the requested resource

The tprController should wait till the TPR is fully created.

Network Shaping/Scheduling

Since Kafka is highly sensitive to Network IO/Bandwidth a high priority would be to ensure it has the required resources.

Currently is possible to set bandwidth limitations on a pod level with:

kubernetes.io/{ingress,egress}-bandwidth

but these annotations are not resources and therefor not used by the scheduler to ensure capacity on a node.

Kubernetes Issues to watch:

Fail to create CRD on start

Hi,

As is, the operator can't create CRDs if not given some permissions.

Applying this makes it work (assuming you deploy on the default namespace) :

Maybe some extra documentation on the README.md file to tell permissions are needed would be nice ;-)

kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: operator-kafka
rules:
- apiGroups:
  - krallistic.github.com
  resources:
  - "*"
  verbs:
  - "*"
- apiGroups:
  - ""
  resources:
  - pods
  - services
  - endpoints
  - persistentvolumeclaims
  - events
  - configmaps
  - secrets
  verbs:
  - "*"
- apiGroups:
  - apps
  resources:
  - deployments
  - daemonsets
  - replicasets
  - statefulsets
  verbs:
  - "*"
- apiGroups:
  - apiextensions.k8s.io
  resources:
  - customresourcedefinitions
  verbs:
  - '*'

---

kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: default-account-operator-kafka
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
roleRef:
  kind: ClusterRole
  name: operator-kafka
  apiGroup: rbac.authorization.k8s.io

ClassNotFoundException for Kafka StatefulSet

We have a test environment on GKE v1.7.8. Kafka server pods keep failing with exception:

org.apache.kafka.common.KafkaException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter ClassNotFoundException exception occurred
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:288)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:207)
	at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:112)
	at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:58)
Caused by: java.lang.ClassNotFoundException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:308)
	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:286)
	... 3 more

This also leads to the crash of cruise-control and kafka-offset deployments.

Integrate Cruise Control

Linkedin opensourced cruise-control (https://github.com/linkedin/cruise-control) a tool to rebalance kafka-cluster. Since they have much more experience running kafka cluster, their algorithms should be used.

As a first integration, the following steps has to be done

  • CC Dockerfile & Deployment
  • Kafka Docker file with CruiseControlMetricsReporter
  • Upsizing with CC rebalancing
  • Downsizing with cc rebalancing

Automatic Rebalancing of topics (if a skew occurs) is out of scope for the first integration

The example is currently unavailable in readme

The example does not run up to [How to use it] in readme.I met the following problem.Which is the specific version of docker image confluentinc/cp-kafka?

kubectl get pods

NAME READY STATUS RESTARTS AGE
busybox 1/1 Running 40 1d
cruise-control-test-cluster-1-d5c7db4d4-7w7zl 0/1 CrashLoopBackOff 142 10h
glusterblock-provisioner-565889669f-f9qkn 1/1 Running 1 6d
glusterfile-provisioner-854d6575f-lg27c 1/1 Running 2 6d
glusterfs-6z6kd 1/1 Running 2 11d
glusterfs-simple-provisioner-7766dd59fb-lt66m 1/1 Running 2 7d
glusterfs-t5x9q 1/1 Running 2 11d
kafka-offset-checker-test-cluster-1-654658954-ts49g 0/1 CrashLoopBackOff 132 10h
kafka-operator-54c96598c5-52gbp 1/1 Running 0 10h
test-cluster-1-0 0/1 CrashLoopBackOff 3 2m
test-cluster-1-1 0/1 CrashLoopBackOff 3 2m
zk-0 1/1 Running 0 10h

kubectl logs test-cluster-1-0

...
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter ClassNotFoundException exception occurred
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
at kafka.server.DynamicMetricsReporters.createReporters(DynamicBrokerConfig.scala:652)
at kafka.server.DynamicMetricsReporters.(DynamicBrokerConfig.scala:600)
at kafka.server.DynamicBrokerConfig.addReconfigurables(DynamicBrokerConfig.scala:161)
at kafka.server.KafkaServer.startup(KafkaServer.scala:301)
at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:117)
at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:62)
Caused by: java.lang.ClassNotFoundException: com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:322)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:311)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
... 6 more
...

Resource Handling

The Spec already contains resources like CPU/RAM (&Disk). While these are somewhat used for the Pod, these also should be pushed down to the JVM.

storageClass spec value is ignored

The value for the storageClass spec option is completely ignored, and instead the operator produces PVCs with a 'standard' storage class.

kind: Kafkacluster
apiVersion: krallistic.github.com/v1
metadata:
  name: kafka
spec:
  brokerCount: 3
  zookeeperConnect: zookeeper.default.svc.cluster.local
  storageClass: slow

I'm suspecting this patch will fix that, but I cannot even get dep ensure to work, and removing the constraints leads to build failures.

diff --git a/util/util.go b/util/util.go
--- a/util/util.go
+++ b/util/util.go
@@ -391,7 +391,7 @@ func (c *ClientUtil) createStsFromSpec(cluster spec.Kafkacluster) *appsv1Beta1.S
 	replicas := cluster.Spec.BrokerCount
 	image := cluster.Spec.Image
 
-	storageClass := "standard"
+	storageClass := cluster.Spec.StorageClass
 
 	//TODO error handling, default value?
 	cpus, err := resource.ParseQuantity(cluster.Spec.Resources.CPU)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.