Giter VIP home page Giter VIP logo

prefecthq / prefect Goto Github PK

View Code? Open in Web Editor NEW
14.6K 160.0 1.5K 142.08 MB

Prefect is a workflow orchestration tool empowering developers to build, observe, and react to data pipelines

Home Page: https://prefect.io

License: Apache License 2.0

Python 98.75% Dockerfile 0.04% Shell 0.08% JavaScript 0.01% HTML 0.01% Vue 0.80% TypeScript 0.25% CSS 0.01% Mako 0.01% Brainfuck 0.01% Jinja 0.06%
python workflow data-engineering data-science workflow-engine prefect infrastructure ml-ops data-ops automation

prefect's Introduction

PyPI

Prefect

Prefect is an orchestration and observability platform for building, observing, and triaging workflows. It's the simplest way to transform Python code into an interactive workflow application.

Prefect allows you to expose your workflows through an API so teams dependent on you can programmatically access your pipelines, business logic, and more. Prefect also allows you to standardize workflow development and deployment across your organization.

With Prefect, you can build resilient, dynamic workflows that react to the world around them and recover from unexpected changes. With just a few decorators, Prefect supercharges your code with features like automatic retries, distributed execution, scheduling, caching, and much more.

Every activity is tracked and can be monitored with a self-hosted Prefect server instance or managed Prefect Cloud dashboard.

Getting started

Prefect requires Python 3.8 or later. To install Prefect, run the following command:

pip install prefect

Then create and run a Python file that uses Prefect flow and task decorators to orchestrate and observe your workflow - in this case, a simple script that fetches the number of GitHub stars from a repository:

from prefect import flow, task
from typing import List
import httpx


@task(log_prints=True)
def get_stars(repo: str):
    url = f"https://api.github.com/repos/{repo}"
    count = httpx.get(url).json()["stargazers_count"]
    print(f"{repo} has {count} stars!")


@flow(name="GitHub Stars")
def github_stars(repos: List[str]):
    for repo in repos:
        get_stars(repo)


# run the flow!
if __name__=="__main__":
    github_stars(["PrefectHQ/Prefect"])

Fire up the Prefect UI to see what happened:

prefect server start

Prefect UI dashboard

To run your workflow on a schedule, turn it into a deployment and schedule it to run every minute by changing the last line of your script to the following:

    github_stars.serve(name="first-deployment", cron="* * * * *")

You now have a server running locally that is looking for scheduled deployments! Additionally you can run your workflow manually from the UI or CLI - and if you're using Prefect Cloud, you can even run deployments in response to events.

Prefect Cloud

Stop worrying about your workflows. Prefect Cloud allows you to centrally deploy, monitor, and manage the data workflows you support. With managed orchestration, automations, and webhooks, all backed by enterprise-class security, build production-ready code quickly and reliably.

Read more about Prefect Cloud here or sign up to try it for yourself.

Prefect Automations

prefect-client

If your use case is geared towards communicating with Prefect Cloud or a remote Prefect server, check out our prefect-client. It was designed to be a lighter-weight option for accessing client-side functionality in the Prefect SDK and is ideal for use in ephemeral execution environments.

Next steps

There's lots more you can do to orchestrate and observe your workflows with Prefect! Start with our friendly tutorial or explore the core concepts of Prefect workflows.

Join the community

Prefect is made possible by the fastest growing community of thousands of friendly data engineers. Join us in building a new kind of workflow system. The Prefect Slack community is a fantastic place to learn more about Prefect, ask questions, or get help with workflow design. All community forums, including code contributions, issue discussions, and slack messages are subject to our Code of Conduct.

Contribute

See our documentation on contributing to Prefect.

Thanks for being part of the mission to build a new kind of workflow system and, of course, happy engineering!

prefect's People

Contributors

abrookins avatar ahuang11 avatar anna-geller avatar anticorrelator avatar billpalombi avatar bunchesofdonald avatar chrisguidry avatar cicdw avatar dependabot[bot] avatar desertaxle avatar discdiver avatar github-actions[bot] avatar jakekaplan avatar jawnsy avatar jlowin avatar marichka-offen avatar peytonrunyan avatar pleek91 avatar rpeden avatar serinamarie avatar stackoverfloweth avatar thatgalnatalie avatar tpdorsey avatar urimandujano avatar willraphaelson avatar zangell44 avatar zanieb avatar zhen0 avatar znicholasbrown avatar zzstoatzz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

prefect's Issues

New State type: TRIGGER_FAILED

When a task fails, and the following task also fails because it has an all_successful trigger, it would be helpful to know that the second task didn't fail because of an error or execution problem, but that it failed because the upstream task failed.

We've discussed creating an UPSTREAM_FAILED state to represent this behavior, but it could create odd situations: supposing a task has an all_failed trigger and the upstream task succeeds. Should the resulting task really say UPSTREAM_FAILED?

I think we need a TRIGGER_FAILED which might act functionally like a SKIP but not be treated as a SUCCESS.

Add PickleEnvironment

For local/offline/simple testing, a PickleEnvironment would make flow serialization much simpler.

Functionally, it would simply serialize a flow using cloudpickle, and deserialize it on demand.

Parameter setting fails non-deterministically

Via @cicdw, the following code fails non-deterministically:

from prefect import *
@task()
def add(x, y):
    return x + y
with Flow('test') as f:
    y = Parameter('y')
    res = add(1, y)
f.run()
f.run().data
s = f.run()
s.all_states
s.all_states()
s.state
s.data
s = f.run(parameters=dict(y=56))
s
s.data
s.data[res.task]
s.data[res.task].data

Context updates for parameters appear to be failing in flow_runner.py, such that the Parameter task is unable to retrieve parameters from context and consequently fails.

Allow Flow States to be determined by tasks other than the terminal tasks

Currently, the State of a flow run is determined exclusively by that Flow's terminal tasks using this waterfall:

  • If any terminal task fails, the flow fails
  • If all terminal tasks succeed, the flow succeeds
  • If all terminal tasks finish, the flow succeeds (this is a separate waterfall step incase we decide that SKIPs - a valid "finished" state - should be handled differently
  • Otherwise the flow remains pending

However, we can imagine situations where this logic breaks down:

  • A flow consists of a task that does something (X) and a terminal clean up task that only runs if X fails.
  • If X succeeds, the clean up task will be skipped and the flow will be marked as successful (this is probably the correct behavior)
  • If X fails, the clean up task will run and be successful -- the flow will be marked as successful. This might be the wrong behavior; here are two arguments why:
    • The flow failed its intended purpose (which was to run X), and should be considered an overall failure.
    • The workflow ran properly -- catching an error and cleaning up -- and should be considered an overall success.

I propose allowing users to specify the tasks that should determine flow state. By default, it's the terminal tasks, but in the above example a user might have said that the X task should be used to determine flow state. No matter what tasks are chosen, the same waterfall described at the top of this issue would apply to those tasks. This would be a property of the Flow object, not a runtime configuration, and would be provided when the flow itself was created:

my_flow = prefect.Flow(name='demo flow', run_state_tasks = [X_task])

I think run_state_tasks is a bad name for this option -- open to suggestions.

Prefect packaging clean-up

Based on recent issues related to our config.toml file, we should:

  • review our package directory structure (possibly moving everything into a src/ directory)
  • review our MANIFEST.in file
  • review how reliable our current testing setup is; specifically, we had tests passing on the core side that were failing on the server side, because config.toml was present in the git-cloned repo but not present in the freshly installed package. This might be a good argument for switching to tox

Relevant literature:

NOTE: any decisions here should be reflected on the https://github.com/PrefectHQ/server side, too (cc: @joshmeek)

Add system flows

  • Scheduler - schedules Flows
  • FlowLoader - Loads Flows from .py files
  • Zombie killer - Kills runs that have no heartbeat

More strictly enforce what information is returned from a flow run

Currently, flow_runner.run() accepts a return_tasks argument. If provided, those tasks' States are returned as the data of the flow run's State. If the value is None (the default), then the terminal tasks' States are returned.

For consistency and to reduce noise, let's change the default behavior to return exactly what is provided as return_tasks. If it's none, then the flow State's data should just be an empty dict.

Add "raise_on_fail" context manager

Task runners and flow runners trap all errors, which can make debugging difficult.

We could create a context manager that would raise errors rather than trapping them. Given a Flow flow that contains a task that raises some error:

flow.run() # returns State(FAILED)

with raise_on_fail():
    flow.run() # actually raises an error at the appropriate place

Remove flow runner test utility from our unit tests

This test utility has some magic in it that can prevent us from properly testing certain features (e.g., return tasks) and should be avoided in our own unit tests. However, this is a useful utility that will likely be a user-facing feature and thus we should actually keep it and unit test it.

Use special context keys for automatic context values

(inspired by work in #18)

Currently, when a flow adds itself to a context, it does so under the flow key (in other words, it becomes available at prefect.context.flow). This has the potential to override a possibly common user-supplied context value. Instead, any automatically-set context value (including flows, as described here, and all settings used by FlowRunner and TaskRunners, like parameters) should be put under a key, perhaps __prefect__. This way they won't shadow user variables.

Implement `TaskResult.run()` method (efficiently)

Currently, to retrieve the return value from a TaskResult, you must:

from prefect import task

@task
def fun(x, y):
    return x + y

tt = fun(1, 2)
tt.flow.run()[tt.task] # returns 3

which feels like it violates the Law of Demeter and should be simpler.

In it's place, we should have something like

tt.run() # returns 3

which should just execute the minimal path necessary to retrieve the result.

cc: @jlowin

Remove TaskResult class

TaskResults are pointers to (Task, Flow) tuples; in others words they refer to the result of a specific task in a specific flow.

If we make the current Flow explicit, then we can remove this class entirely because there will never be ambiguity about the Flow in question. For example:

with Flow() as flow:
    x = Task('x')
    y = Task('y')
    z = AddTask()(x=x, y=y)

assert x in flow.tasks
assert y in flow.tasks
assert z in flow.tasks

We can also create a global default Flow which would allow users to continue to use the current syntax. This might be helpful for quick tests. In order to submit to Prefect server, however, users would have to create a new flow object (with a proper name).

x = Task('x')
y = Task('y')
z = AddTask()(x=x, y=y)

assert x in get_global_flow().tasks
assert y in get_global_flow().tasks
assert z in get_global_flow().tasks

new_flow = get_global_flow().copy_as(name='my flow')

Use dask.bag to easily access remote/local storage

dask.bag.read_text can load files from local storage, HDFS, S3, and GCS. Since it is already a required dependency (because of distributed) it's a good choice for remote Flow storage and configuration

`start_tasks` kwarg doesn't work

The start_tasks kwarg appears to not be working:

from prefect import *

GLOBAL_DICT = {'key': 0}

class PrintTask(Task):
    def run(self):
        GLOBAL_DICT['key'] += 1
        return 1

class ReturnTask(Task):
    was_called = False
    def run(self, x):
        if self.was_called is False:
            self.was_called = True
            raise ValueError("error")
        return x

with Flow() as f:
    t1 = PrintTask()
    t2 = ReturnTask(max_retries=1)
    result = t2(t1())

state = f.run(return_tasks=[t2])
state = f.run(return_tasks=[t2], start_tasks=[t2])

## GLOBAL_DICT['key'] == 2

Flow methods should raise a more informative error if a task is not in the flow

flow.edges_to(task) and flow.edges_from(task) (and methods that depend on them, like upstream_tasks()) raise a KeyError if the task in question is not in the Flow. The error says that the task can't be found in an edge dictionary -- perhaps we should have a more informative error saying that the task is not in the current Flow.

Support *args for task run() methods

Currently, *args is explicitly not supported because edges that pass data are identified by string keys that become the **kwargs for the task's run() method. One way to support *args would be to automatically create edges with integer keys. When executing a flow, we would put edges with string keys into **kwargs and edges with integer keys into *args.

Supply context via local variables / context manager

Rather than pass context variables explicitly as kwargs, supply them as variables that are set via context manager

Task Definition (user code)

from prefect.context import as_of_date

class MyTask:
    def run(self):
        """ return True if this task's as-of-date is today """
       if as_of_date.date() == datetime.now().date():
           return True
        else:
            return False

TaskRunner (Prefect code)

# context manager sets the values of variables like as_of_date
with prefect.context.set_context(context):
   my_task.run()

Explore transforming State into a class instead of a String + Data attribute

Instead of the current State class that has a string state and Any data attribute, as well as a bunch of is_finished() methods that check the state string against a list of valid strings, consider building a hierarchy of State classes:

# Base class

class State:
    # class attribute for example; set in __init__
    data = None

# -------------------------------------------
# Finished classes - Success, Failed, Skipped
class Finished(State):
    pass

class Success(Finished):
    pass

class Failed(Finished):
    # class attribute for example; set in __init__
    message = None

class Skipped(Finished):
    pass

# -------------------------------------------
# Pending classes - Retry and Scheduled (and Pending itself)
class Pending(State):
    pass

class Retry(Pending):
    # class attribute for example; set in __init__
    retry_time = None

class Scheduled(Pending):
    # class attribute for example; set in __init__
    scheduled_time = None

Then checking and working with states is easier than the current system of checking the string attribute and hoping the data attribute matches an expected schema:

s = Success(100)
f = Failed('Division by zero error')

assert isinstance(s, Finished)
assert isinstance(f, Finished)

r = Retry(datetime(2018, 12, 31))
assert isinstance(r, Pending)
assert isinstance(r.retry_time datetime)

Flow should always return data dictionary, even when everything fails

Flow().run() should always return a State object with a complete data dictionary attribute, even when the Flow fails early on. In this case, every Task should be represented with a FAILED State.

Example of empty dictionary:

from prefect import Parameter, Flow, task

@task
def add(x, y):
    return x + y

with Flow("test") as f:
    y = Parameter('y')
    res = add(1, y)

f.run().data # returns None, should return {<Task: add>: State(FAILED)}

Example of incomplete dictionary:

from prefect import Flow, task

@task
def err(x, y):
    raise ValueError("no sir")

@task
def add(x, y):
    return x + y

with Flow("test") as f:
    e = err(1, 2)
    a = add(e, 5)

f.run().data # returns {<Task: add>: State(FAILED)}, should return {<Task: err>: State(FAILED), <Task: add>: State(FAILED)}

Update trigger functions to use `TriggerFailed`

Once #44 / #59 are merged, trigger functions should raise TriggerFailed instead of Failed.

Moreover, triggers should probably never just return False -- originally that was done to allow triggers to indicate that they didn't have enough information to determine if they would run or not (for example, an all_success trigger that was passed 2 Success states and 1 Pending) but now that all states must be finished before being passed to a trigger, triggers should always be able to either pass or raise a signal.

Whenever a flow context is opened, the operations should be atomic

Example:
When opening a flow with five tasks which has one task fail (e.g. the task has already been assigned in the flow) then the flow should be rolled back instead of opened up until the point of failure.

node_1 = a()
node_2 = b(node_1)
node_3 = c(node_2)
node_4 = node_1(node_3)
node_5 = d(node_4)

The above code currently produces the flow a -> b -> c and instead should not produce a valid flow.

Flows don't properly restore previous context

Flows can be used as context managers:

with prefect.Flow('my flow') as f:
    ...

When this happens, the flow adds itself to prefect.context under the flow key. It stores a reference to the flow key's previous value so that it can restore that value when the context manager exits.

However, if the previous value was None (or if there was no flow key at all), then the flow restores flow=None to the previous context. Later, this creates weird behavior when other Prefect objects check to see if a flow is in context.

This issue was previously masked by the bug described in #16

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.