Giter VIP home page Giter VIP logo

prefect-kubernetes's Introduction

Note

Active development of this project has moved within PrefectHQ/prefect. The code can be found here and documentation here. Please open issues and PRs against PrefectHQ/prefect instead of this repository.

prefect-kubernetes

PyPI

Welcome!

prefect-kubernetes is a collection of Prefect tasks, flows, and blocks enabling orchestration, observation and management of Kubernetes resources.

Jump to examples.

Resources

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Installation

Install prefect-kubernetes with pip:

 pip install prefect-kubernetes

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

Then, to register blocks on Prefect Cloud:

prefect block register -m prefect_kubernetes

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Example Usage

Use with_options to customize options on any existing task or flow

from prefect_kubernetes.flows import run_namespaced_job

customized_run_namespaced_job = run_namespaced_job.with_options(
    name="My flow running a Kubernetes Job",
    retries=2,
    retry_delay_seconds=10,
) # this is now a new flow object that can be called

For more tips on how to use tasks and flows in a Collection, check out Using Collections!

Specify and run a Kubernetes Job from a yaml file

from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob

k8s_creds = KubernetesCredentials.load("k8s-creds")

job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
    credentials=k8s_creds,
    manifest_path="path/to/job.yaml",
)

job.save("my-k8s-job", overwrite=True)

if __name__ == "__main__":
    # run the flow
    run_namespaced_job(job)

Generate a resource-specific client from KubernetesClusterConfig

# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect_kubernetes.credentials import KubernetesCredentials

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

k8s_credentials = KubernetesCredentials(cluster_config=k8s_config)

with k8s_credentials.get_client("core") as v1_core_client:
    for namespace in v1_core_client.list_namespace().items:
        print(namespace.metadata.name)

List jobs in a specific namespace

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.jobs import list_namespaced_job

@flow
def kubernetes_orchestrator():
    v1_job_list = list_namespaced_job(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        namespace="my-namespace",
    )

Patch an existing deployment

from kubernetes.client.models import V1Deployment

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from prefect_kubernetes.utilities import convert_manifest_to_model

@flow
def kubernetes_orchestrator():

    v1_deployment_updates = convert_manifest_to_model(
        manifest="path/to/manifest.yaml",
        v1_model_name="V1Deployment",
    )

    v1_deployment = patch_namespaced_deployment(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        deployment_name="my-deployment",
        deployment_updates=v1_deployment_updates,
        namespace="my-namespace"
    )

Feedback

If you encounter any bugs while using prefect-kubernetes, feel free to open an issue in the prefect-kubernetes repository.

If you have any questions or issues while using prefect-kubernetes, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to star or watch prefect-kubernetes for updates too!

Contributing

If you'd like to help contribute to fix an issue or add a feature to prefect-kubernetes, please propose changes through a pull request from a fork of the repository.

Here are the steps:

  1. Fork the repository
  2. Clone the forked repository
  3. Install the repository and its dependencies:
 pip install -e ".[dev]"
  1. Make desired changes
  2. Add tests
  3. Insert an entry to CHANGELOG.md
  4. Install pre-commit to perform quality checks prior to commit:
 pre-commit install
  1. git commit, git push, and create a pull request

prefect-kubernetes's People

Contributors

ahuang11 avatar bunchesofdonald avatar chrisguidry avatar dependabot[bot] avatar desertaxle avatar discdiver avatar gabcoyne avatar jawnsy avatar kevingrismore avatar nicholasfiorentini avatar tardunge avatar tsaiian avatar urimandujano avatar willraphaelson avatar zangell44 avatar zanieb avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect-kubernetes's Issues

Kubernetes worker hangs at the end of flow run execution

Expectation / Proposal

The Kubernetes worker hangs at the end of a flow run execution when tearing down the Event Replicator. Main thread gets blocked on this line when joining the replicator thread to the main thread. Adding a timeout to the .join() call prevents the blocking, but the replicator thread still hangs around, and then the worker hangs on shutdown.

Traceback / Example

Kubernetes job run OK but agent fails to get status : KeyError: ‘controller-uid’

Kubernetes job run OK but agent fails to get status : KeyError: ‘controller-uid’

Expectation / Proposal

from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.flows import run_namespaced_job # this is a flow
from prefect_kubernetes.jobs import KubernetesJob
import asyncio
k8s_creds = KubernetesCredentials.load("my-cerd")

job = KubernetesJob.from_yaml_file( # or create in the UI with a dict manifest
    credentials=k8s_creds,
    manifest_path="helloworld.yaml",
    namespace="my-ns"
)

if __name__ == "__main__":
    asyncio.run(run_namespaced_job(job))

Traceback / Example

03:18:41.197 | ERROR   | Flow run 'amazing-rottweiler' - Finished in state Failed("Flow run encountered an exception. KeyError: 'controller-uid'\n")
Traceback (most recent call last):
  File "/mnt/k8s_run_pod4.py", line 18, in <module>
    asyncio.run(run_namespaced_job(job))
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 258, in create_then_begin_flow_run
    return await state.result(fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 700, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/flows.py", line 41, in run_namespaced_job
    await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 160, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1168, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/futures.py", line 241, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/jobs.py", line 419, in wait_for_completion
    "controller-uid=" f"{v1_job_status.metadata.labels['controller-uid']}"
                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^
KeyError: 'controller-uid'

Related issue:

add `job` tasks

Migrate the create, delete, update, patch, read, replace job tasks from 1.0 task library

Worker raises InvalidChunkLength when replicating pod events

The KubernetesEventsReplicator._replicate_pod_events method occasionally raises the following exception:

urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

The exception happens randomly and is difficult to reproduce. A similar issue was raised related to streaming Kubernetes logs. This PR seems to have fixed the problem in that case. Hopefully the fix here is to use the same handling strategy in this part of the code.

Traceback / Example

Exception in thread Thread-5 (_replicate_pod_events):
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/threading.py", line 975, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
    for event in self._watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

'CoreV1Api' object has no attribute 'patch_namespaced_deployment'.

patch_namespaced_deployment is not working

Error message: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'.

Expectation / Proposal

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from kubernetes.client.models import V1Deployment

@flow
def kubernetes_orchestrator():
    v1_deployment_metadata = patch_namespaced_deployment(
        kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
        deployment_name="test-deployment",
        deployment_updates=V1Deployment(metadata={"labels": {"foo": "bar"}}),
    )

Traceback / Example

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/engine.py", line 1581, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/deployments.py", line 186, in patch_namespaced_deployment
    core_v1_client.patch_namespaced_deployment,
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'CoreV1Api' object has no attribute 'patch_namespaced_deployment'

Error handling fails with `TypeError: string indices must be integers, not 'str'`

The error handling lines here seem to be wrong:

if exc.body and "message" in exc.body:
message += ": " + exc.body["message"]

Here's a trace logged for a permissions failure that shows the issue -- we're seeing body as a string instead of a dictionary:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 818, in _create_job
    job = batch_client.create_namespaced_job(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
    return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request
    return self.rest_client.POST(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 279, in POST
    return self.request("POST", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6ea3fb18-ec69-4254-8be6-c5dc255d4236', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'f06cc20e-f9d3-4599-bd8c-0a6592b6d582', 'X-Kubernetes-Pf-Prioritylevel-Uid': '939c2e24-97b7-472b-b8fc-dd8f0148bf9c', 'Date': 'Wed, 13 Mar 2024 16:45:14 GMT', 'Content-Length': '318'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:prefect-worker\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}



During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 630, in run
    job = await run_sync_in_worker_thread(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 95, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 827, in _create_job
    message += ": " + exc.body["message"]
                      ~~~~~~~~^^^^^^^^^^^
TypeError: string indices must be integers, not 'str'

See also, the error output in this issue: PrefectHQ/prefect#12987

add`run_namespaced_job` task

A relatively heavily used task from the 1.0 library, this task should be migrated into the 2.0 collection.

The interface should improve usability from 1.0 versions, allowing easier specification of Kubernetes Jobs.

Add resiliency to requests to k8s API in kubernetes worker

We have seen multiple instances where job submission in the worker fails due to transient errors with the create_namespaced_job call on the k8s client in the _create_job function. This causes the flow run to enter a CRASHED state.

Expectation / Proposal

Transient errors communicating to the k8s API should be retried in order to prevent failures.

Traceback / Example

Failed to submit flow run '6adb9df4-3504-44bc-bef1-ba0cdc3542b4' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 404, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 1058, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connection.py", line 419, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/util/ssl_.py", line 453, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/util/ssl_.py", line 495, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1100, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1371, in do_handshake
    self._sslobj.do_handshake()
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.10/dist-packages/prefect_kubernetes/worker.py", line 567, in run
    job = await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/dist-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/dist-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/dist-packages/prefect_kubernetes/worker.py", line 763, in _create_job
    job = batch_client.create_namespaced_job(
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job
    return self.create_namespaced_job_with_http_info(namespace, body, **kwargs)  # noqa: E501
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info
    return self.api_client.call_api(
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/api_client.py", line 391, in request
    return self.rest_client.POST(url,
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/rest.py", line 279, in POST
    return self.request("POST", url,
  File "/usr/local/lib/python3.10/dist-packages/kubernetes/client/rest.py", line 172, in request
    r = self.pool_manager.request(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/request.py", line 81, in request
    return self.request_encode_body(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/request.py", line 173, in request_encode_body
    return self.urlopen(method, url, **extra_kw)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/poolmanager.py", line 376, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 799, in urlopen
    retries = retries.increment(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/util/retry.py", line 550, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 715, in urlopen
    httplib_response = self._make_request(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 404, in _make_request
    self._validate_conn(conn)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 1058, in _validate_conn
    conn.connect()
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connection.py", line 419, in connect
    self.sock = ssl_wrap_socket(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/util/ssl_.py", line 453, in ssl_wrap_socket
    ssl_sock = _ssl_wrap_socket_impl(sock, context, tls_in_tls)
  File "/usr/local/lib/python3.10/dist-packages/urllib3/util/ssl_.py", line 495, in _ssl_wrap_socket_impl
    return ssl_context.wrap_socket(sock)
  File "/usr/lib/python3.10/ssl.py", line 513, in wrap_socket
    return self.sslsocket_class._create(
  File "/usr/lib/python3.10/ssl.py", line 1100, in _create
    self.do_handshake()
  File "/usr/lib/python3.10/ssl.py", line 1371, in do_handshake
    self._sslobj.do_handshake()
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

Support Custom Objects API

Expectation / Proposal

Add CustomObjectsAPI to mange crud operations on Kubernetes Custom Resource Definitions. This gives end users flexibility to create their own operator specific resources and watch over them.

Traceback / Example

Support entirety of KubernetesJob Spec out of the box

Hello,

I found myself in the place that I basically have to write a worker and thus infrastructure block from scratch just to support the whole range of the KubernetesJob spec. I think this should be the default and I don't understand why the decision has been made to limit this library to only a very limited subset of options such as env, args, labels, name... Much of the flexibility of kubernetes is lost due to that, e.g. we cannot map secrets to the pod as environment variables, see #83.

Expectation / Proposal

I propose to add all the available options from the KubernetesJob spec to the KubernetesWorkerVariables, KubernetesWorkerJobConfiguration and KubernetesJob classes. I already did the majority of the work for myself and are happy to create a PR for this.

Traceback / Example

For example, the version of my KubernetesWorkerVariables used in my custom worker is simply extending the current class:

class KASWorkerVariables(KubernetesWorkerVariables):
    ports: list[Port] | None = Field(
        default=None,
        description="Ports exposed on the container",
    )
    labels: dict[str, str] | None = Field(
        default=_get_default_labels(),
        description="Container labels",
    )
    command: list[str] | None = Field(
        default=None,
        description="Container command",
    )
    args: list[str] | None = Field(
        default=None,
        description="Container arguments appended to container command",
    )
    volumes: list[Volume] | None = Field(
        default=None,
        description="List of volumes to expose",
    )
    affinity: Affinity | None = Field(
        default=None,
        description="Pod Affinity",
    )
    env_from: list[EnvRef] | None = Field(
        default=None,
        description="Kubernetes References that are mapped as environment variables in the KubernetesJob",
    )
    resources: Resources | None = Field(
        default=Resources(
            limits=Resource(memory="2Gi"),
            requests=Resource(cpu="1", memory="2Gi"),
        ),
        description="Pod Kubernetes Ressources",
    )
    annotations: dict[str, str] | None = Field(
        default=None,
        description="Pod's annotation",
    )
    volume_mounts: list[VolumeMount] | None = Field(
        default=None,
        description="List of volumes to mount into the pod",
    )
    image_pull_secrets: list[ImagePullSecret] | None = Field(
        default=[ImagePullSecret(name="my-pull-secret")],
        description="List of image pull secrets",
    )

You can see that in non-trivial cases I added Pedantic Models that mimmic the KubernetesJob spec, such as Resources, Port, Volume, VolumeMount, EnvRef, Affinity

Request: detect when jobs are blocked from starting due to k8s limits

Description

My k8s namespace has resource limits (max total CPU and RAM). If starting a Prefect job pod would go over those limits, it just never starts, and the flow on the UI hangs indefinitely. The agent logs still say Completed deployment of flow run. When I increased the limits, the job pod started immediately and the flow completed as normal.

Expected Behavior

It would be nice if this scenario could be detected and result in a useful error on the UI or agent level.

add `service` tasks

We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes services via this collection. The following tasks should be re-implemented as 2.0 tasks in services.py:

  • CreateNamespacedService
  • DeleteNamespacedService
  • ListNamespacedService
  • PatchNamespacedService
  • ReadNamespacedService
  • ReplaceNamespacedService

Kubernetes Worker stops polling work pool

Expectation / Proposal

I sometimes hit an issue where the kubernetes worker is running (deployed using Helm) and it suddenly stops polling the work pool. When I check the pod there is no logs indicating anything went wrong but it simply stops/hangs. It is likely related to #96 but not the same.

I'm using AKS witk k8 version 1.27.7

Moved over from PrefectHQ/prefect#11561.

running the command inside the worker pod

Version:             2.14.3
API version:         0.8.4
Python version:      3.11.6
Git commit:          f1ff9257
Built:               Thu, Nov 2, 2023 4:12 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Traceback / Example

Sometimes it recovers from the hanging and then I see this in the logs (where it apears that it has been hanging for 10 minuttes)

Exception in thread Thread-3 (_replicate_pod_events):
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/events.py", line 93, in _replicate_pod_events
    for event in self._watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
22:39:01.985 | ERROR   | prefect.flow_runs.worker - An error occurred while monitoring flow run 'f95bef47-c688-4fd1-85af-aa9177cae6b7'. The flow run will not be marked as failed, but an issue may have occurred.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 761, in _update_chunk_length
    self.chunk_left = int(line, 16)
                      ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 828, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 765, in _update_chunk_length
    raise InvalidChunkLength(self, line)
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/prefect/workers/base.py", line 896, in _submit_run_and_capture_errors
    result = await self.run(
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 568, in run
    status_code = await run_sync_in_worker_thread(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/worker.py", line 841, in _watch_job
    for event in watch.stream(
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 624, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 816, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))

Jobs launched with run_namespaced_job fails in case of evictions

Expectation / Proposal

When a pod is evicted from a job launched with run_namespaced_job, the whole flow fails due to this runtime error even if the new pod created by Kubernetes after the eviction completes. This is due to this query getting all pods tied to the controller UUID.

I think wait_for_completion should ignore evicted pods in this loop.

Traceback / Example

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1925, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect_kubernetes/jobs.py", line 449, in wait_for_completion
    raise RuntimeError(
RuntimeError: Job '***' failed, check the Kubernetes pod logs for more information.
wait_for_completion-0
prefect.task_runs

Finished in state Failed("Task run encountered an exception RuntimeError: Job '***' failed, check the Kubernetes pod logs for more information.")
wait_for_completion-0
prefect.task_runs

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 844, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect_kubernetes/flows.py", line 41, in run_namespaced_job
    await task(kubernetes_job_run.wait_for_completion.aio)(kubernetes_job_run)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/api.py", line 182, in wait_for_call_in_loop_thread
    return call.result()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 284, in result
    return self.future.result(timeout=timeout)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1420, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.9/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1925, in orchestrate_task_run
    result = await call.aresult()
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 293, in aresult
    return await asyncio.wrap_future(self.future)
  File "/usr/local/lib/python3.9/site-packages/prefect/_internal/concurrency/calls.py", line 355, in _run_async
    result = await coro
  File "/usr/local/lib/python3.9/site-packages/prefect_kubernetes/jobs.py", line 449, in wait_for_completion
    raise RuntimeError(
RuntimeError: Job '***' failed, check the Kubernetes pod logs for more information.
prefect.flow_runs

Finished in state Failed("Flow run encountered an exception. RuntimeError: Job '***' failed, check the Kubernetes pod logs for more information.")

report ErrImagePull in Prefect UI to improve observability

Description

When using the KubernetesFlowRunner, and the agent fails to create a pod because of an invalid image, the UI shows "Pending" with no further indication of the failure.

Screen Shot 2022-04-18 at 3 39 54 pm

Reproduction / Example

from prefect import flow, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner


@flow
def test_flow() -> None:
    # show in prefect ui
    logger = get_run_logger()
    logger.info("Hello Prefect UI from Kubernetes!")


DeploymentSpec(
    name="test-deployment",
    flow=test_flow,
    flow_runner=KubernetesFlowRunner(image="localhost:5550/flow-this-does-not-exist", stream_output=True)
)

As expected, the pod fails to start. This can be seen via kubectl:

$ kubectl describe pod berserk-lionfish2b48r--1-9tqtq
Name:         berserk-lionfish2b48r--1-9tqtq
Namespace:    default
Priority:     0
Node:         k3d-orion-server-0/172.20.0.3
Start Time:   Mon, 18 Apr 2022 15:35:53 +1000
Labels:       controller-uid=359d3bbd-e5da-472b-bc48-5afeb42d6790
              job-name=berserk-lionfish2b48r
Annotations:  <none>
Status:       Pending
IP:           10.42.0.24
IPs:
  IP:           10.42.0.24
Controlled By:  Job/berserk-lionfish2b48r
Containers:
  job:
    Container ID:
    Image:         localhost:5550/flow-this-does-not-exist:latest
    Image ID:
    Port:          <none>
    Host Port:     <none>
    Command:
      python
      -m
      prefect.engine
      37dd67bb-0bb0-4c77-91f4-e31082ec415b
    State:          Waiting
      Reason:       ImagePullBackOff
    Ready:          False
    Restart Count:  0
    Environment:
      PREFECT_API_URL:  http://orion:4200/api
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-4mgps (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  kube-api-access-4mgps:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   BestEffort
Node-Selectors:              <none>
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type     Reason     Age                    From               Message
  ----     ------     ----                   ----               -------
  Normal   Scheduled  5m46s                  default-scheduler  Successfully assigned default/berserk-lionfish2b48r--1-9tqtq to k3d-orion-server-0
  Normal   Pulling    4m15s (x4 over 5m45s)  kubelet            Pulling image "localhost:5550/flow-this-does-not-exist:latest"
  Warning  Failed     4m15s (x4 over 5m45s)  kubelet            Failed to pull image "localhost:5550/flow-this-does-not-exist:latest": rpc error: code = Unknown desc = failed to pull and unpack image "localhost:5550/flow-this-does-not-exist:latest": failed to resolve reference "localhost:5550/flow-this-does-not-exist:latest": failed to do request: Head "http://localhost:5550/v2/flow/manifests/latest": dial tcp 127.0.0.1:5550: connect: connection refused
  Warning  Failed     4m15s (x4 over 5m45s)  kubelet            Error: ErrImagePull
  Warning  Failed     4m3s (x6 over 5m45s)   kubelet            Error: ImagePullBackOff
  Normal   BackOff    37s (x21 over 5m45s)   kubelet            Back-off pulling image "localhost:5550/flow-this-does-not-exist:latest"

Agent logs:

05:35:53.375 | INFO    | prefect.agent - Submitting flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.423 | INFO    | prefect.flow_runner.kubernetes - RUNNING
05:35:53.427 | INFO    | prefect.flow_runner.kubernetes - Flow run 'berserk-lionfish' has job settings = {'metadata': {'generateName': 'berserk-lionfish', 'namespace': 'default', 'labels': {'io.prefect.flow-run-id': '37dd67bb-0bb0-4c77-91f4-e31082ec415b', 'io.prefect.flow-run-name': 'berserk-lionfish', 'app': 'orion'}}, 'spec': {'template': {'spec': {'restartPolicy': 'Never', 'containers': [{'name': 'job', 'image': 'localhost:5550/flow:latest', 'command': ['python', '-m', 'prefect.engine', '37dd67bb-0bb0-4c77-91f4-e31082ec415b'], 'env': [{'name': 'PREFECT_API_URL', 'value': 'http://orion:4200/api'}]}]}}, 'backoff_limit': 4}}
05:35:53.470 | INFO    | prefect.agent - Completed submission of flow run '37dd67bb-0bb0-4c77-91f4-e31082ec415b'
05:35:53.481 | INFO    | prefect.flow_runner.kubernetes - Flow run job 'berserk-lionfish2b48r' has status {'active': None,
 'completed_indexes': None,
 'completion_time': None,
 'conditions': None,
 'failed': None,
 'ready': None,
 'start_time': None,
 'succeeded': None,
 'uncounted_terminated_pods': None}
05:35:53.481 | INFO    | prefect.flow_runner.kubernetes - Starting watch for pod to start. Job: berserk-lionfish2b48r
05:35:58.459 | ERROR   | prefect.flow_runner.kubernetes - Pod never started. Job: berserk-lionfish2b48r

Reporting the ErrImagePull in the Prefect UI, and setting the status to "Failed", would aid debugging and improve observability.

Cruft update

I think this collection might be outdated with cruft

add `deployment` tasks

We would like to reach parity with the 1.0 task library as it relates to interacting with kubernetes deployments via this collection. The following tasks should be re-implemented as 2.0 tasks in deployments.py:

  • CreateNamespacedDeployment
  • DeleteNamespacedDeployment
  • ListNamespacedDeployment
  • PatchNamespacedDeployment
  • ReadNamespacedDeployment
  • ReplaceNamespacedDeployment

Kubernetes Worker does not release sockets after Job completes

Expectation / Proposal

After a Job responsible for a flow run completes, TCP connections on the worker pod should close and release sockets. Instead, one TCP connection per flow run persists in state CLOSE_WAIT. Eventually, the worker pod will run out of sockets and flow runs will begin to fail during calls to create_namespaced_job.

Observed in prefect-kubernetes==0.3.1.

Traceback / Example

Failed to submit flow run 'c34aed0d-0396-424c-bc88-b8497b79ba63' to infrastructure. Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 174, in _new_conn conn = connection.create_connection( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 95, in create_connection raise err File "/usr/local/lib/python3.11/site-packages/urllib3/util/[connection.py](https://connection.py/)", line 85, in create_connection sock.connect(sa) OSError: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 715, in urlopen httplib_response = self._make_request( ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 404, in _make_request self._validate_conn(conn) File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 1058, in _validate_conn conn.connect() File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 363, in connect self.sock = conn = self._new_conn() ^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connection.py](https://connection.py/)", line 186, in _new_conn raise NewConnectionError( urllib3.exceptions.NewConnectionError: <urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/prefect/workers/[base.py](https://base.py/)", line 896, in _submit_run_and_capture_errors result = await self.run( ^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 567, in run job = await run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect/utilities/[asyncutils.py](https://asyncutils.py/)", line 91, in run_sync_in_worker_thread return await anyio.to_thread.run_sync( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/to_thread.py", line 33, in run_sync return await get_asynclib().run_sync_in_worker_thread( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread return await future ^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 807, in run result = context.run(func, *args) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/prefect_kubernetes/[worker.py](https://worker.py/)", line 763, in _create_job job = batch_client.create_namespaced_job( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 210, in create_namespaced_job return self.create_namespaced_job_with_http_info(namespace, body, **kwargs) # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api/batch_v1_api.py", line 309, in create_namespaced_job_with_http_info return self.api_client.call_api( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request return self.rest_client.POST(url, ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 279, in POST return self.request("POST", url, ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/kubernetes/client/[rest.py](https://rest.py/)", line 172, in request r = self.pool_manager.request( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 81, in request return self.request_encode_body( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[request.py](https://request.py/)", line 173, in request_encode_body return self.urlopen(method, url, **extra_kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[poolmanager.py](https://poolmanager.py/)", line 376, in urlopen response = conn.urlopen(method, u.request_uri, **kw) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 827, in urlopen return self.urlopen( ^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/[connectionpool.py](https://connectionpool.py/)", line 799, in urlopen retries = retries.increment( ^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/urllib3/util/[retry.py](https://retry.py/)", line 592, in increment raise MaxRetryError(_pool, url, error or ResponseError(cause)) urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='172.20.0.1', port=443): Max retries exceeded with url: /apis/batch/v1/namespaces/prefect2-flows/jobs (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f8e2acb28d0>: Failed to establish a new connection: [Errno 99] Cannot assign requested address'))

This can be reproduced by starting a Kubernetes worker using the helm chart with all default configs, and then running some flows. After the flows complete, the output from running cat /proc/net/tcp | wc -l on the worker pod will eventually show to have increased by exactly the number of flow runs. Running cat /proc/net/tcp shows these connections in state 08:

23: 08013C0A:EBD2 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 137269 1 0000000000000000 20 4 12 10 -1                   
24: 08013C0A:9874 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 156620 1 0000000000000000 20 4 12 10 -1                   
25: 08013C0A:98D6 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 171986 1 0000000000000000 20 4 12 10 -1                   
26: 08013C0A:98EA 0100400A:01BB 08 00000000:00000019 00:00000000 00000000  1001        0 172205 1 0000000000000000 20 4 12 10 -1

from tcp_states.h:

enum {
    TCP_ESTABLISHED = 1,
    TCP_SYN_SENT,
    TCP_SYN_RECV,
    TCP_FIN_WAIT1,
    TCP_FIN_WAIT2,
    TCP_TIME_WAIT,
    TCP_CLOSE,
    TCP_CLOSE_WAIT,
    TCP_LAST_ACK,
    TCP_LISTEN,
    TCP_CLOSING,    /* Now a valid state */
    TCP_NEW_SYN_RECV,

    TCP_MAX_STATES  /* Leave at the end! */
};

08 is CLOSE_WAIT

The CLOSE_WAIT state indicates that the remote end of the connection has finished transmitting data and that the remote application has issued a close(2) or shutdown(2) call. The local TCP stack is now waiting for the local application that owns the socket to close(2) the local socket as well.

Here are some issues reporting the same behavior for async and multi-threaded applications that use the Python Kubernetes client:

Replicate kube events to Prefect that occur before the pod starts

Expectation / Proposal

Like agents on Prefect 1, when using Prefect 2 kube workers, make kube events that occur before the pod starts visible via Prefect.

At the moment all we get via Prefect is:

Worker 'KubernetesWorker edf7261c-e388-46f8-a0c5-9eb7de8c7c0f' submitting flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:25 PM prefect.flow_runs.worker
Creating Kubernetes job... 02:20:26 PM prefect.flow_runs.worker
Completed submission of flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' 02:20:26 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod has status 'Pending'. 02:20:27 PM prefect.flow_runs.worker
Job 'beige-stingray-tzkrv': Pod never started. 02:21:26 PM prefect.flow_runs.worker
Reported flow run 'ee662a6b-5303-430f-9ab7-acd5468f5d22' as crashed: Flow run infrastructure exited with non-zero status code -1. 02:21:27 PM prefect.flow_runs.worker

It would be useful to get the kube events so that we can diagnose this from Prefect without having to use kubectl etc.

prefect-kubernetes 0.2.8

Traceback / Example

Example events that are available via kubectl but not prefect:

17m         Normal    SuccessfulCreate    job/beige-stingray-tzkrv              Created pod: beige-stingray-tzkrv-vglpm
16m         Normal    TriggeredScaleUp    pod/beige-stingray-tzkrv-vglpm        pod triggered scale-up: [{gpu-accelerated-us-east-1a 1->2 (max: 5)}]
16m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 7 Insufficient memory.
16m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 1 node(s) were unschedulable, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {node.kubernetes.io/not-ready: }, that the pod didn't tolerate, 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 49 Insufficient nvidia.com/gpu, 6 Insufficient memory.
15m         Warning   FailedScheduling    pod/beige-stingray-tzkrv-vglpm        0/51 nodes are available: 1 node(s) had taint {sandbox: true}, that the pod didn't tolerate, 45 Insufficient cpu, 50 Insufficient nvidia.com/gpu, 6 Insufficient memory.
14m         Normal    Scheduled           pod/beige-stingray-tzkrv-vglpm        Successfully assigned awesome-app/beige-stingray-tzkrv-vglpm to ip-10-144-171-199.ec2.internal

add `pod` tasks

migrate the primary remaining tasks for interacting with pod resources, ensuring we have complete test coverage

multi run_namespaced_job funcs run togeter. If a job error ,others job will get error.

Code like this :

for i in job:
    log = run_namespaced_job(i)

If one job faild , the def wait_for_completion() of the jobs.py , self._completed = False , it counldn't become self._completed = True. this while will continue. Then it will get a error and retry the same job. But if it didn't delete the job , the kuberentes will get error like this HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"bidding-hunter-cjwiif\" already exists","reason":"AlreadyExists","details":{"name":"bidding-hunter-cjwiif","group":"batch","kind":"jobs"},"code":409}

Traceback / Example

Temporary fix this issues, i modify the jobs.py. Let the program exit , then it can run the other jobs

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.