Giter VIP home page Giter VIP logo

conductor-python's Introduction

Conductor OSS Python SDK

Python SDK for working with https://github.com/conductor-oss/conductor.

Conductor is the leading open-source orchestration platform allowing developers to build highly scalable distributed applications.

Check out the official documentation for Conductor.

⭐ Conductor OSS

Show support for the Conductor OSS. Please help spread the awareness by starring Conductor repo.

GitHub stars

Content

Install Conductor Python SDK

Before installing Conductor Python SDK, it is a good practice to set up a dedicated virtual environment as follows:

virtualenv conductor
source conductor/bin/activate

Get Conductor Python SDK

The SDK requires Python 3.9+. To install the SDK, use the following command:

python3 -m pip install conductor-python

Hello World Application Using Conductor

In this section, we will create a simple "Hello World" application that executes a "greetings" workflow managed by Conductor.

Step 1: Create Workflow

Creating Workflows by Code

Create greetings_workflow.py with the following:

from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from greetings_worker import greet

def greetings_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    name = 'greetings'
    workflow = ConductorWorkflow(name=name, executor=workflow_executor)
    workflow.version = 1
    workflow >> greet(task_ref_name='greet_ref', name=workflow.input('name'))
    return workflow

(Alternatively) Creating Workflows in JSON

Create greetings_workflow.json with the following:

{
  "name": "greetings",
  "description": "Sample greetings workflow",
  "version": 1,
  "tasks": [
    {
      "name": "greet",
      "taskReferenceName": "greet_ref",
      "type": "SIMPLE",
      "inputParameters": {
        "name": "${workflow.input.name}"
      }
    }
  ],
  "timeoutPolicy": "TIME_OUT_WF",
  "timeoutSeconds": 60
}

Workflows must be registered to the Conductor server. Use the API to register the greetings workflow from the JSON file above:

curl -X POST -H "Content-Type:application/json" \
http://localhost:8080/api/metadata/workflow -d @greetings_workflow.json

Note

To use the Conductor API, the Conductor server must be up and running (see Running over Conductor standalone (installed locally)).

Step 2: Write Task Worker

Using Python, a worker represents a function with the worker_task decorator. Create greetings_worker.py file as illustrated below:

Note

A single workflow can have task workers written in different languages and deployed anywhere, making your workflow polyglot and distributed!

from conductor.client.worker.worker_task import worker_task


@worker_task(task_definition_name='greet')
def greet(name: str) -> str:
    return f'Hello {name}'

Now, we are ready to write our main application, which will execute our workflow.

Step 3: Write Hello World Application

Let's add helloworld.py with a main method:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from greetings_workflow import greetings_workflow


def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
    workflow = greetings_workflow(workflow_executor=workflow_executor)
    workflow.register(True)
    return workflow


def main():
    # The app is connected to http://localhost:8080/api by default
    api_config = Configuration()

    workflow_executor = WorkflowExecutor(configuration=api_config)

    # Registering the workflow (Required only when the app is executed the first time)
    workflow = register_workflow(workflow_executor)

    # Starting the worker polling mechanism
    task_handler = TaskHandler(configuration=api_config)
    task_handler.start_processes()

    workflow_run = workflow_executor.execute(name=workflow.name, version=workflow.version,
                                             workflow_input={'name': 'Orkes'})

    print(f'\nworkflow result: {workflow_run.output["result"]}\n')
    print(f'see the workflow execution here: {api_config.ui_host}/execution/{workflow_run.workflow_id}\n')
    task_handler.stop_processes()


if __name__ == '__main__':
    main()

Running Workflows on Conductor Standalone (Installed Locally)

Setup Environment Variable

Set the following environment variable to point the SDK to the Conductor Server API endpoint:

export CONDUCTOR_SERVER_URL=http://localhost:8080/api

Start Conductor Server

To start the Conductor server in a standalone mode from a Docker image, type the command below:

docker run --init -p 8080:8080 -p 5000:5000 conductoross/conductor-standalone:3.15.0

To ensure the server has started successfully, open Conductor UI on http://localhost:5000.

Execute Hello World Application

To run the application, type the following command:

python helloworld.py

Now, the workflow is executed, and its execution status can be viewed from Conductor UI (http://localhost:5000).

Navigate to the Executions tab to view the workflow execution.

Screenshot 2024-03-18 at 12 30 07

Running Workflows on Orkes Conductor

For running the workflow in Orkes Conductor,

  • Update the Conductor server URL to your cluster name.
export CONDUCTOR_SERVER_URL=https://[cluster-name].orkesconductor.io/api
  • If you want to run the workflow on the Orkes Conductor Playground, set the Conductor Server variable as follows:
export CONDUCTOR_SERVER_URL=https://play.orkes.io/api
export CONDUCTOR_AUTH_KEY=your_key
export CONDUCTOR_AUTH_SECRET=your_key_secret

Run the application and view the execution status from Conductor's UI Console.

Note

That's it - you just created and executed your first distributed Python app!

Learn More about Conductor Python SDK

There are three main ways you can use Conductor when building durable, resilient, distributed applications.

  1. Write service workers that implement business logic to accomplish a specific goal - such as initiating payment transfer, getting user information from the database, etc.
  2. Create Conductor workflows that implement application state - A typical workflow implements the saga pattern.
  3. Use Conductor SDK and APIs to manage workflows from your application.

conductor-python's People

Contributors

amjith avatar anjkl avatar c4lm avatar coderabhigupta avatar dengguojie avatar dougsillars avatar gardusig avatar hebrd avatar jchacking avatar marcocrasso avatar nhandt2021 avatar opoupeney avatar phmurias-invitae avatar rizafarheen avatar srividhya-s-subramanian avatar tullytim avatar v1r3n avatar v891 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

conductor-python's Issues

Refactor unit tests, to be compliant with recent client changes

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "[email protected]",
            "backoffScaleFactor": 1
        },
    ]


def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "[email protected]",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }


def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')


def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId


def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList


def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()

Create integration tests for general use case

Aim to test all possible ways of application startup, with and without some parameters.

Tool used to test by hand:

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.http.api.metadata_resource_api import MetadataResourceApi
from conductor.client.http.api.task_resource_api import TaskResourceApi
from conductor.client.http.api.workflow_resource_api import WorkflowResourceApi
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_interface import WorkerInterface
from typing import List
import logging

logger = logging.getLogger(
    Configuration.get_logging_formatted_name(
        __name__
    )
)


class SimplePythonWorker(WorkerInterface):
    def execute(self, task: Task) -> TaskResult:
        task_result = self.get_task_result_from_task(task)
        task_result.add_output_data('key1', 'value')
        task_result.add_output_data('key2', 42)
        task_result.add_output_data('key3', False)
        task_result.status = TaskResultStatus.COMPLETED
        return task_result


def get_python_task_definition_example() -> List[dict]:
    return [
        {
            "createTime": 1650595379661,
            "createdBy": "",
            "name": "python_task_example_from_code",
            "description": "Python task example from code",
            "retryCount": 3,
            "timeoutSeconds": 300,
            "inputKeys": [],
            "outputKeys": [],
            "timeoutPolicy": "TIME_OUT_WF",
            "retryLogic": "FIXED",
            "retryDelaySeconds": 10,
            "responseTimeoutSeconds": 180,
            "inputTemplate": {},
            "rateLimitPerFrequency": 0,
            "rateLimitFrequencyInSeconds": 1,
            "ownerEmail": "[email protected]",
            "backoffScaleFactor": 1
        },
    ]


def get_python_workflow_definition_example() -> dict:
    return {
        "updateTime": 1650595431465,
        "name": "workflow_with_python_task_example_from_code",
        "description": "Workflow with python task example from code",
        "version": 1,
        "tasks": [
            {
                "name": "python_task_example_from_code",
                "taskReferenceName": "python_task_example_from_code_ref_0",
                "inputParameters": {

                },
                "type": "SIMPLE",
                "decisionCases": {

                },
                "defaultCase": [

                ],
                "forkTasks":[

                ],
                "startDelay":0,
                "joinOn":[

                ],
                "optional":False,
                "defaultExclusiveJoinTask":[

                ],
                "asyncComplete":False,
                "loopOver":[

                ]
            }
        ],
        "inputParameters": [

        ],
        "outputParameters": {
            "workerOutput": "${python_task_example_from_code_ref_0.output}"
        },
        "schemaVersion": 2,
        "restartable": True,
        "workflowStatusListenerEnabled": False,
        "ownerEmail": "[email protected]",
        "timeoutPolicy": "ALERT_ONLY",
        "timeoutSeconds": 0,
        "variables": {

        },
        "inputTemplate": {

        }
    }


def define_task_and_workflow(api_client: ApiClient) -> None:
    metadata_client = MetadataResourceApi(api_client)
    try:
        metadata_client.register_task_def1(
            body=get_python_task_definition_example()
        )
        metadata_client.create(
            body=get_python_workflow_definition_example()
        )
    except Exception as e:
        logger.debug(f'Failed to define task/workflow, reason: {e}')


def start_workflow(api_client: ApiClient, workflow_name: str) -> str:
    workflow_client = WorkflowResourceApi(api_client)
    workflowId = workflow_client.start_workflow(
        body={},
        name=workflow_name
    )
    return workflowId


def start_workflows(api_client: ApiClient, workflow_name: str, qty: int) -> List[str]:
    workflowIdList = []
    for _ in range(qty):
        try:
            workflowId = start_workflow(api_client, workflow_name)
            workflowIdList.append(workflowId)
            logger.debug(
                f'Started workflow: {workflow_name}, with id: {workflowId}'
            )
        except Exception as e:
            logger.debug(
                f'Failed to start workflow: {workflow_name}, reason: {e}'
            )
    return workflowIdList


def main():
    configuration = Configuration(
        base_url='https://play.orkes.io',
        debug=True,
        authentication_settings=AuthenticationSettings(
            key_id='',
            key_secret=''
        )
    )
    configuration.apply_logging_config()

    api_client = ApiClient(configuration)

    workflow_id = start_workflow(
        api_client,
        'workflow_with_python_task_example_from_code'
    )
    logger.debug(f'workflow_id: {workflow_id}')

    task_api = TaskResourceApi(api_client)
    response = task_api.update_task_by_ref_name(
        output={'hello': 'world'},
        workflow_id=workflow_id,
        task_ref_name='python_task_example_from_code_ref_0',
        status=TaskResultStatus.COMPLETED.value,
    )
    logger.debug(f'task update response: {response}')

    workers = [
        SimplePythonWorker('python_task_example_from_code'),
    ]
    workflow_ids = start_workflows(
        api_client,
        'workflow_with_python_task_example_from_code',
        10
    )
    metrics_settings = MetricsSettings()
    with TaskHandler(workers, configuration, metrics_settings) as task_handler:
        task_handler.start_processes()
        task_handler.join_processes()


if __name__ == '__main__':
    main()

Evaluate client performance

Create a benchmark comparing the performance while changing the number of workers. Increasing this number should improve the performance, until reaching an inversion point, which will at least plateau the graph. Like this example:

image

Setup GitHub Action to publish pypi releases

  1. setup a github action that listens to release events (e.g. https://github.com/Netflix/conductor/blob/main/.github/workflows/publish.yml)
  2. Update the setup.cfg to use an env variable for version
    extract the tag that is checked out - the tag is created as part of GitHub release process
  3. set the value of the tag (.e.g. v1.0.1) to extract version (e.g. 1.0.1) and set as env variable
  4. Use env variables to pass user/pass for pypi
  5. Configure the GitHub project with pypi user/pass as secrets that you can use in 5 above

Improve TaskHandler polling strategy

  • Add new parameter for batchSize
  • Each worker will have a standalone subprocess to poll indefinitely
    • For each polled task, start a new subprocess to execute and update it

Invalid value for `name`

Failed to poll task for: xxx,
conductor/client/http/models/task_def.py", line 265, in name
raise ValueError("Invalid value for name, must not be None") # noqa: E501
ValueError: Invalid value for name, must not be None

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.