Giter VIP home page Giter VIP logo

flink-on-k8s-operator's Introduction

Build Status GoDoc License Go Report Card

Kubernetes Operator for Apache Flink

Kubernetes operator for that acts as control plane to manage the complete deployment lifecycle of Apache Flink applications. This is an open source fork of GoogleCloudPlatform/flink-on-k8s-operator with several new features and bug fixes.

Project Status

Beta

The operator is under active development, backward compatibility of the APIs is not guaranteed for beta releases.

Prerequisites

Overview

The Kubernetes Operator for Apache Flink extends the vocabulary (e.g., Pod, Service, etc) of the Kubernetes language with custom resource definition FlinkCluster and runs a controller Pod to keep watching the custom resources. Once a FlinkCluster custom resource is created and detected by the controller, the controller creates the underlying Kubernetes resources (e.g., JobManager Pod) based on the spec of the custom resource. With the operator installed in a cluster, users can then talk to the cluster through the Kubernetes API and Flink custom resources to manage their Flink clusters and jobs.

Features

  • Support for both Flink job cluster and session cluster depending on whether a job spec is provided
  • Custom Flink images
  • Flink and Hadoop configs and container environment variables
  • Init containers and sidecar containers
  • Remote job jar
  • Configurable namespace to run the operator in
  • Configurable namespace to watch custom resources in
  • Configurable access scope for JobManager service
  • Taking savepoints periodically
  • Taking savepoints on demand
  • Restarting failed job from the latest savepoint automatically
  • Cancelling job with savepoint
  • Cleanup policy on job success and failure
  • Updating cluster or job
  • Batch scheduling for JobManager and TaskManager Pods
  • GCP integration (service account, GCS connector, networking)
  • Support for Beam Python jobs

Installation

The operator is still under active development, there is no Helm chart available yet. You can follow either

  • User Guide to deploy a released operator image on ghcr.io/spotify/flink-operator to your Kubernetes cluster or
  • Developer Guide to build an operator image first then deploy it to the cluster.

Documentation

Quickstart guides

API

How to

Tech talks

  • CNCF Webinar: Apache Flink on Kubernetes Operator (video, slides)

Community

Contributing

Please check CONTRIBUTING.md and the Developer Guide out.

flink-on-k8s-operator's People

Contributors

ayush-singhal28 avatar bnu0 avatar deliangfan avatar dependabot[bot] avatar elanv avatar functicons avatar gezimsejdiu avatar grypse avatar haoxins avatar hgonggg avatar hongyegong avatar jaredstehler avatar jeffwan avatar jschulte01 avatar jto avatar kinderyj avatar kocomic avatar laughingman7743 avatar live-wire avatar metowolf avatar mrart avatar pjthepooh avatar prasanna12510 avatar regadas avatar renecouto avatar shashken avatar sudokamikaze avatar thebalu avatar yaron-idan avatar yolgun avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-on-k8s-operator's Issues

Cluster state stuck on creating state

This observed when using job clusters. If the job submitter fails due to permissions, wrong jar path or even init container issues, cluster state is not updated.

Questions about webhook

I have noticed that disabling webhook, specifically MutatingWebhook, will cause job submitter hangs after submitting the job successfully and other unexpected behaviors. The reason why asking this is because installing webhook can be potentially dangerous as it impacts every single API call that K8S makes and they have to be rigorously monitored. I want to better understand what features are using webhook, and what could be causing the issue when there is no MutatingWebhook in place, and the motivation of this design. Any insights will be appreciated.

cc: @regadas @elanv

Failed Calling Webhook

I'm trying to deploy the Operator on top of a GKE cluster, but I'm running into issues when I'm trying to deploy the sample. I have a fairly standard cluster atm, without too many features enabled. I've deployed both cert manager and the operator and both are up and running without a problem.

Flink Operator System Namespace

k get po,svc -n flink-operator-system 
NAME                                                     READY   STATUS    RESTARTS   AGE
pod/flink-operator-controller-manager-5b4f96ddc5-dhlv5   2/2     Running   0          4h29m

NAME                                                        TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
service/flink-operator-controller-manager-metrics-service   ClusterIP   10.150.117.28    <none>        8443/TCP   4h29m
service/flink-operator-webhook-service                      ClusterIP   10.150.234.114   <none>        443/TCP    4h29m

Cert Manager Namespace

k get po,svc -n cert-manager         
NAME                                          READY   STATUS    RESTARTS   AGE
pod/cert-manager-848f547974-fbtfd             1/1     Running   0          4h42m
pod/cert-manager-cainjector-54f4cc6b5-49p58   1/1     Running   0          4h42m
pod/cert-manager-webhook-58fb868868-4w4pr     1/1     Running   0          4h42m

NAME                           TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
service/cert-manager           ClusterIP   10.150.202.12   <none>        9402/TCP   4h42m
service/cert-manager-webhook   ClusterIP   10.150.136.39   <none>        443/TCP    4h42m

However, when I try to deploy the sample session cluster, I get the following error message:

Error from server (InternalError): error when creating "./samples/flinkoperator_v1beta1_flinksessioncluster.yaml": Internal error occurred: failed calling webhook "mflinkcluster.flinkoperator.k8s.io": Post "https://flink-operator-webhook-service.flink-operator-system.svc:443/mutate-flinkoperator-k8s-io-v1beta1-flinkcluster?timeout=10s": dial tcp 10.100.0.10:9443: i/o timeout

Does anyone have any pointers? I've checked the services and they point to the correct endpoints. The selectors look fine, but I'm a bit stuck on how I can troubleshoot this efficiently.

This is my cluster config:

locals {
  gke_operator_sa_roles = [
    "roles/logging.logWriter",
    "roles/monitoring.metricWriter",
    "roles/monitoring.viewer",
  ]
}

resource "google_service_account" "cluster_identity" {
  project    = module.default.project_id
  account_id = "cluster-id"
}

resource "google_project_iam_member" "cluster_identity_permissions" {
  for_each = toset(local.gke_operator_sa_roles)
  project  = module.default.project_id
  member   = "serviceAccount:${google_service_account.cluster_identity.email}"
  role     = each.value
}

resource "google_container_cluster" "default" {
  project                  = module.default.project_id
  name                     = var.cluster_name
  remove_default_node_pool = true
  initial_node_count       = 1
  location                 = var.zone
  network                  = google_compute_network.default.self_link
  subnetwork               = google_compute_subnetwork.default.self_link
  min_master_version       = var.cluster_version

  release_channel {
    channel = var.channel
  }

  ip_allocation_policy {
    services_secondary_range_name = var.svc_range_name
    cluster_secondary_range_name  = var.pod_range_name
  }

  private_cluster_config {
    enable_private_endpoint = false
    enable_private_nodes    = true
    master_ipv4_cidr_block  = var.master_ipv4_cidr_block
  }

  node_config {
    service_account = google_service_account.cluster_identity.email
    oauth_scopes = [
      "storage-ro",
      "logging-write",
      "monitoring"
    ]
  }

  timeouts {
    create = "45m"
    update = "45m"
    delete = "45m"
  }

  depends_on = [
    google_project_iam_member.cluster_identity_permissions
  ]
}

resource "google_container_node_pool" "default" {
  provider   = google-beta
  project    = module.default.project_id
  name       = "${google_container_cluster.default.name}-nodes"
  cluster    = google_container_cluster.default.name
  location   = var.zone
  node_count = 1

  node_config {
    image_type   = "cos_containerd"
    machine_type = "n2-standard-4"

    service_account = google_service_account.cluster_identity.email
    oauth_scopes = [
      "storage-ro",
      "logging-write",
      "monitoring"
    ]

    disk_size_gb = 20
    disk_type    = "pd-ssd"
  }

  timeouts {
    create = "45m"
    update = "45m"
    delete = "45m"
  }

  depends_on = [
    google_project_iam_member.cluster_identity_permissions
  ]
}

Nil pointer exception during update when jobsubmitter pod is lost

When updating a chart, job submitter pod will be terminated and then flink operator will encounter a nil pointer exception and crash. This doesn't happen in v0.1.15 (though it has another issue at update), and seems to be introduced at v0.1.16. From the trace it looks related to https://github.com/spotify/flink-on-k8s-operator/pull/120/files#diff-1d53f2b19e0186f6437a431a19916402d6c59a531c7a959981e7e7305e342677R304 @regadas

{"level":"info","ts":1634156619.167147,"logger":"controllers.FlinkCluster","msg":"Failed to extract job submit result","cluster":"myCluster","error":"failed to get logs for pod : resource name may not be empty"}
{"level":"info","ts":1634156619.1740642,"logger":"controllers.FlinkCluster","msg":"Observed Flink job status list","cluster":"myCluster","jobs":[{"jid":"8f14e333b9720528b574012c77a8175d","state":"CANCELED","name":"myApp","start-time":1634156476498,"end-time":1634156541413,"duration":64915}]}

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0x142da9b]

goroutine 403 [running]:
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStateObserver).observeFlinkJobStatus(0xc000d358e8, 0xc000d35c98, 0xc000d35610)
	/workspace/controllers/flinkcluster_observer.go:304 +0x69b
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStateObserver).observeJob(0xc000d358e8, 0xc000d35c98)
	/workspace/controllers/flinkcluster_observer.go:265 +0x3eb
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStateObserver).observe(0xc000d358e8, 0xc000d35c98)
	/workspace/controllers/flinkcluster_observer.go:208 +0xdc5
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterHandler).reconcile(0xc000d35c28, {0x17b2a1d, 0x25dbd20}, {{{0xc0002780d8, 0x203000}, {0xc000278120, 0xc0006aa540}}})
	/workspace/controllers/flinkcluster_controller.go:142 +0x285
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterReconciler).Reconcile(0xc0008f6a80, {0x19c2478, 0xc00031b0e0}, {{{0xc0002780d8, 0x11}, {0xc000278120, 0x11}}})
	/workspace/controllers/flinkcluster_controller.go:84 +0x309
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler(0xc00090caa0, {0x19c23d0, 0xc000574000}, {0x162e8c0, 0xc000aa6400})
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:298 +0x303
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem(0xc00090caa0, {0x19c23d0, 0xc000574000})
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:253 +0x205
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2()
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:214 +0x85
created by sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:210 +0x354

Job tracking routine changes for the blocking mode

I recently found that the job tracking routine has changed. It may be very rare, but I think there may be exceptional situations in which multiple jobs or duplicate jobs are submitted to the cluster due to user mistakes or errors in the associated system. Because it is difficult to predict or cope with all the various side effects from that, in order to prevent such a situation, I have previously implemented to identify and track the job by the ID obtained from the submitted job.

As for blocking mode, job tracking routine seems to be changed as it is now because the job ID cannot be obtained immediately, but I am a little concerned about taking these issues to support blocking mode.

flinkJobStatus.flinkJob = &flinkJobList.Jobs[0]
for _, job := range flinkJobList.Jobs[1:] {
flinkJobStatus.flinkJobsUnexpected = append(flinkJobStatus.flinkJobsUnexpected, job.Id)
}

In addition to selecting and tracking an arbitrary job when there are multiple jobs as above, when handling unexpected jobs, they are currently arbitrarily cleared collectively as follows. Rather, what do you think about to make users to check and handle unexpected jobs themselves like PR #107?

// Cancel unexpected jobs
if len(observed.flinkJobStatus.flinkJobsUnexpected) > 0 {
log.Info("Cancelling unexpected running job(s)")
err = reconciler.cancelUnexpectedJobs(false /* takeSavepoint */)
return requeueResult, err
}

<PR #107>
https://github.com/elanv/flink-on-k8s-operator/blob/2ad3293cf85ea92a9b6d020b707a878a0ec59ed2/controllers/flinkcluster_reconciler.go#L509-L520

Missing securityContext Properties on CRD

I have some problems migrating from the GoogleCloudPlatform/flink-on-k8s-operator. It looks like certain properties just vanished from the CRD. Please take a look at the following list.

taskManager / jobManager / image / job

  1. securityContext.privileged
  2. securityContext.allowPrivilegeEscalation
  3. securityContext.readOnlyRootFilesystem
  4. securityContext.capabilities

I checked the CRD, whether the properties may have been relocated, but I wasn't able to find them in a different place. Is there a reason for removing them? Please let me know, when something is missing to further assist the investigations.
Thank you very much!

Make deploy failes for Kubernetes v1.21.2

I'm trying to build and deploy the operator using Docker with Kubernetes on MacOS, but deploying it fails unfortunately.

Environment

Docker: v20.10.7
Kubernetes: v1.21.2
MacOS: 11.5.1

I use the following commands.

make operator-image push-operator-image IMG=localhost:5000/flink-operator:latest
make deploy IMG=localhost:5000/flink-operator FLINK_OPERATOR_NAMESPACE=flink-operator WATCH_NAMESPACE=default

The make deploy command fails with the following error.

go: creating new go.mod: module tmp
go get: added sigs.k8s.io/controller-tools v0.6.1
/Users/niklaswilcke/go/bin/controller-gen "crd:maxDescLen=0,trivialVersions=true,generateEmbeddedObjectMeta=true" rbac:roleName=manager-role webhook paths="./api/v1beta1/..." output:crd:artifacts:config=config/crd/bases
kubectl kustomize config/crd > config/crd/bases/patched_crd.yaml
mv config/crd/bases/patched_crd.yaml config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml
go mod tidy
kubectl apply -f config/crd/bases
The CustomResourceDefinition "flinkclusters.flinkoperator.k8s.io" is invalid: spec.preserveUnknownFields: Invalid value: true: must be false in order to use defaults in the schema
make: *** [install] Error 1

I think this problem is related to the dependency sigs.k8s.io/controller-tools v0.6.1, which has been downgraded in the past to v0.2.4 to fix this issue and now you recently upgraded it to v0.6.1. Can you maybe give some information, why it has been upgraded and why this error could be triggered.

Thank you!

Support zap logging config flags

  • --zap-devel: Development Mode defaults(encoder=consoleEncoder,logLevel=Debug,stackTraceLevel=Warn) Production Mode defaults(encoder=jsonEncoder,logLevel=Info,stackTraceLevel=Error)
  • --zap-encoder: Zap log encoding (‘json’ or ‘console’)
  • --zap-log-level: Zap Level to configure the verbosity of logging. Can be one of ‘debug’, ‘info’, ‘error’, or any integer value > 0 which corresponds to custom debug levels of increasing verbosity”)
  • --zap-stacktrace-level: Zap Level at and above which stacktraces are captured (one of ‘info’ or ‘error’)
    Consult the controller-runtime godocs for more detailed flag information.

Job submitter stays in running status after successful submission

With v0.2.0, job submitter will stay running status instead of completed like previous version. This doesn't seem to cause any issue except when I hit an issue where a new pod could be spun up to submit a new job when a current job is recovering/restarting from an exception. Then more than one job will be in JM and only one of them can be in running state, while the others keep restarting. The operator only recognizes the original job.

Job submitter hangs at this log message, and didn't signal to operator to indicate job completion.

Job has been submitted with JobID <job_id>

autoSavepointSeconds Property doesn't have any effect

No matter what I set in job.autoSavepointSeconds the savepoint interval will be roughly 15 minutes. Am I doing something wrong? This problem already exists in the original GoogleCloudPlatform operator and I hoped this might be fixed here.

Is this issue maybe related to #84?

Any help appreciated. Thank you very much!

[DISCUSS] Refactor to operator sdk

From my understanding this operator is created using kubebuilder. While this is sufficient as an operator, it can be useful to switch to operator sdk that can support operator lifecycle management. I have checked golang project structure of operator sdk, its fairly similar to kubebuilder.

https://github.com/operator-framework/operator-lifecycle-manager
https://github.com/operator-framework/operator-sdk

While it is a primarily Red Hat project, it is also part of CNCF incubating project.

Thoughts?

CRD does not include an api-approved.kubernetes.io annotation.

Per kubernetes/enhancements#1111, CRDs in the *.k8s.io domain should be approved by the k8s community and have the api-approved.kubernetes.io annotation added to them with a reference to the PR which approved the CRD, and CRDs in this domain which lack this annotation can not be installed into k8s clusters using CRD.v1.

Fortunately, it also provides a mechanism for projects which have unintentionally run into this policy to avoid the enforcement mechanism by adding the annotation with an "unapproved" value. I would suggest that an api-approved.kubernetes.io: "unapproved" annotation be added to the CRD pending any longer-term fix (e.g. moving out of the k8s.io domain or getting the CRD approved).

Nil pointer exception thrown around savepoint status

Not sure how to reproduce this, but I have encountered this issue several times in a running job. This appeared to happen around when the job was making savepoints and it got stuck at InProcess state. Then the operator will end up in a crash loop with a nil pointer exception at https://github.com/spotify/flink-on-k8s-operator/blob/master/controllers/flink/client.go#L140.

Cluster description:

  Savepoint:
    Job ID:          4c5f06bfd903005a7dfae5fd6269e5bb
    Request Time:    2021-10-22T17:21:04Z
    State:           InProgress
    Trigger ID:      8ce9487f90662eba510dc0734f430047
    Trigger Reason:  scheduled
    Trigger Time:    2021-10-22T17:21:04Z
  State:             Running
Events:              <none>

Operator stack trace:

panic: runtime error: invalid memory address or nil pointer dereference
23
[signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0x143cf4c]
22
21
goroutine 459 [running]:
20
github.com/spotify/flink-on-k8s-operator/controllers/flink.(*SavepointStatus).IsSuccessful(...)
19
	/workspace/controllers/flink/client.go:140
18
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStatusUpdater).deriveSavepointStatus(0x19aaf50, 0xc000b679d0, 0xc00037e0e0, 0xc0005a1860, 0xc0020160f0)
17
	/workspace/controllers/flinkcluster_updater.go:844 +0xec
16
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStatusUpdater).deriveClusterStatus(_, _, _)
15
	/workspace/controllers/flinkcluster_updater.go:471 +0x1776
14
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStatusUpdater).updateStatusIfChanged(0xc000b678f8)
13
	/workspace/controllers/flinkcluster_updater.go:65 +0xb7
12
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterHandler).reconcile(0xc000b67c08, {0x17b1071, 0x9}, {{{0xc00049ef90, 0x4}, {0xc00049ef78, 0xc00060dc20}}})
11
	/workspace/controllers/flinkcluster_controller.go:164 +0x3a5
10
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterReconciler).Reconcile(0xc000472f80, {0x19bdbf8, 0xc00020d320}, {{{0xc00049ef90, 0x11}, {0xc00049ef78, 0x11}}})
9
	/workspace/controllers/flinkcluster_controller.go:84 +0x30d
8
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler(0xc0002dcf00, {0x19bdb50, 0xc00049a000}, {0x162cc20, 0xc00061e000})
7
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:298 +0x303
6
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem(0xc0002dcf00, {0x19bdb50, 0xc00049a000})
5
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:253 +0x205
4
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2()
3
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:214 +0x85
2
created by sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2
1
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:210 +0x354

Dependency cert-manager missing from v0.1.14 on

I recently tried out the new release v0.1.14 and ran into the problem that make deploy would fail with the following error message.

$ make deploy IMG=ghcr.io/spotify/flink-operator:v0.1.14
[...]
service/flink-operator-webhook-service created
deployment.apps/flink-operator-controller-manager created
Warning: admissionregistration.k8s.io/v1beta1 MutatingWebhookConfiguration is deprecated in v1.16+, unavailable in v1.22+; use admissionregistration.k8s.io/v1 MutatingWebhookConfiguration
mutatingwebhookconfiguration.admissionregistration.k8s.io/flink-operator-mutating-webhook-configuration created
Warning: admissionregistration.k8s.io/v1beta1 ValidatingWebhookConfiguration is deprecated in v1.16+, unavailable in v1.22+; use admissionregistration.k8s.io/v1 ValidatingWebhookConfiguration
validatingwebhookconfiguration.admissionregistration.k8s.io/flink-operator-validating-webhook-configuration created
Error from server (NotFound): error when creating "STDIN": the server could not find the requested resource (post certificates.cert-manager.io)
Error from server (NotFound): error when creating "STDIN": the server could not find the requested resource (post issuers.cert-manager.io)
make: *** [deploy] Error 1

The problem is that the CRDs for Certificate and Issuer are missing. I found out that they are provided by the cert-manager project.

Installing cert-manager fixes the problem, but I wanted to report this, because I think it would be nice to have this documented or fixed in the build logic.

kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.5.3/cert-manager.yaml

If you guide me I could try to provide a pull request.

Internal Error: Failing to call webhook

This error happens after #7

Internal error occurred: failed calling webhook "mflinkcluster.flinkoperator.k8s.io": Post https://flink-operator-webhook-service.flink-operator-system.svc:443/mutate-flinkoperator-k8s-io-v1beta1-flinkcluster?timeout=30s: ssh: rejected: connect failed (Connection refused)

Apparently, there are more things that were missed during the update.

Fail to resubmit job when job parameters change

Hi, I am getting errors in the operator pod when trying to upgrade a job with different parameters

Observed job submitter	{"cluster": <cluster name>, "state": "nil"}
Observed job submitter pod list	{"cluster":  <cluster name>, "state": {"metadata":{},"items":[]}}
Failed to extract job submit result	{"cluster": "<cluster name>", "error": "job pod found, but no termination log found even though submission completed"}
github.com/spotify/flink-on-k8s-operator/controllers.(*ClusterStateObserver).observe
	/workspace/controllers/flinkcluster_observer.go:206
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterHandler).reconcile
	/workspace/controllers/flinkcluster_controller.go:137
github.com/spotify/flink-on-k8s-operator/controllers.(*FlinkClusterReconciler).Reconcile
	/workspace/controllers/flinkcluster_controller.go:81
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:298
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:253
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2
	/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:214

I have observed whenever I try upgrading a job cluster with flinkProperties or job volume mount changes.

The previous submitter pods are terminated and no new submitter pods are spun up. The previous running job is also cancelled with no new job replacing it.

Would appreciate some pointers on resolving this issue. Thanks!

Deployment fails on kubernetes v1.18.x

When deploying fails with something like:

Error: failed to create resource: CustomResourceDefinition.apiextensions.k8s.io "flinkclusters.flinkoperator.k8s.io" is invalid: [
spec.validation.openAPIV3Schema.properties[spec].properties[job].properties[initContainers].items.properties[ports].items.properties[protocol].default: Required value: this property is in x-kubernetes-list-map-keys, so it must have a default or be a required property,
spec.validation.openAPIV3Schema.properties[spec].properties[taskManager].properties[sidecars].items.properties[ports].items.properties[protocol].default: Required value: this property is in x-kubernetes-list-map-keys, so it must have a default or be a required property,
spec.validation.openAPIV3Schema.properties[spec].properties[taskManager].properties[initContainers].items.properties[ports].items.properties[protocol].default: Required value: this property is in x-kubernetes-list-map-keys, so it must have a default or be a required property,
spec.validation.openAPIV3Schema.properties[spec].properties[jobManager].properties[initContainers].items.properties[ports].items.properties[protocol].default: Required value: this property is in x-kubernetes-list-map-keys, so it must have a default or be a required property, spec.validation.openAPIV3Schema.properties[spec].properties[jobManager].properties[sidecars].items.properties[ports].items.properties[protocol].default: Required value: this property is in x-kubernetes-list-map-keys, so it must have a default or be a required property]

Exposing livenessProbe and readinessProbe Configuration in Task Manager and Job Manager Pod Spec

Right now the following is the default provided:

  livenessProbe:
    failureThreshold: 5
    initialDelaySeconds: 5
    periodSeconds: 60
    successThreshold: 1
    tcpSocket:
      port: 6123
    timeoutSeconds: 10

Occasionally we encounter an unusual situation where the task manager did not reach RPC port start up, and was terminated by k8s. But when the port starts up while the pod is in terminating state, the job gets submitted, and killed eventually, creating a restart loop (which after some time resolves when magically the task manager starts before 25 seconds).

I can work on this and create pull request here if you think this is useful.

OutOfMemory exception for the JobManager

Is there a way to tweak memory usage for the JobManager? I've deployed the Flink operator on a GKE cluster and I was able to successfully launch the WordCount job. However, when I launch a custom job, it almost immediately fails due to java.lang.OutOfMemoryError: Java heap space.

        at org.apache.flink.client.program.OptimizerPlanEnvironment.generateException(OptimizerPlanEnvironment.java:150)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getPipeline(OptimizerPlanEnvironment.java:90)
        at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:108)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:58)
        at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:138)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space

I'm deploying a custom container, which runs Tensorflow models. I already tried setting the limits of the jobManager to "8Gi", but even then it still failed. I had a look at the CRD for the JobManager, but there doesn't seem to set a parameter where you can tweak memory usage. I also had a look at the parameters to run the job, but I can't find anything either.

Should we encourage the community to contribute to your fork?

Hi @regadas ,

This is not an issue, though I was not sure how else to start the conversation.

As you know, the original flink-on-k8s-operator is no longer maintained, I and several others have submitted PR to it but couldn't get them merged.

I'm happy to discover your fork seems active. Do you plan to continue working on this? Is it useful that I re-submit my PR here? Is it OK if I encourage other contributors to do the same?

Thanks a lot,

S

Savepoint Cleanup during Runtime

Hi,
when I start a job with savepoints enabled they are constantly piling up. As far as I can see there are only some cleanup policies available in case the job gets terminated. I'm wondering why there is no retention count or anything similar to get rid of stale savepoints during job runtime. Is this cleanup procedure supposed to be taken care of by the user? Maybe via a sidecar container? Maybe I'm getting something wrong. I would really appreciate to get some insights.
Thank you very much!

Multiple consecutive savepoints triggered

I have this weird issue around savepoint. The job runs fine but sometimes it gets into a state where multiple savepoints will be triggered, about 1 minute each (the job has autoSavepointSeconds: 3600 which works usually fine), see trigger time below
Screen Shot 2021-09-24 at 6 29 35 PM
After some time the job will fail eventually, and a new job will be submitted but can't restore, so it goes into submit-and-fail loop.

I have to re-deploy the resource to put the app restored from the last successful savepoint. However at the beginning of the new job, multiple savepoints will be triggered again, but this time it will mostly be successful. It will stop eventually though, and become normal again. It looks like it tries to complete all the triggered savepoints earlier but I'm not sure.
Screen Shot 2021-09-24 at 7 19 39 PM

Wonder if anyone encounter this or have any idea. (I'm using the original flink operator from google, but ask here because here is more active)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.