Giter VIP home page Giter VIP logo

apache / airflow Goto Github PK

View Code? Open in Web Editor NEW
34.7K 754.0 13.6K 277.47 MB

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows

Home Page: https://airflow.apache.org/

License: Apache License 2.0

Python 95.98% JavaScript 0.30% CSS 0.06% HTML 0.43% Shell 0.51% Mako 0.01% Jupyter Notebook 0.02% Dockerfile 0.26% HCL 0.01% TypeScript 2.27% Jinja 0.14% R 0.01% Go 0.03%
airflow apache apache-airflow python scheduler workflow automation dag data-engineering data-integration data-orchestrator data-pipelines data-science elt etl machine-learning mlops orchestration workflow-engine workflow-orchestration

airflow's Issues

Can you have a DAG dependent on another DAG?

I see airflow.operators.ExternalTaskSensor supports dependency on an ask in another DAG. But is it possible to set dependency on another DAG itself? In other words, can external_task_id parameter be optional?

Add an FTP hook

Do you have plans to support FTP hooks?

I've noticed there's an FTP connection type in the front-end, but I've not found a component to utilise the connection. Is this a case of me extending BaseHook to wrap some functionality to pull and push files via FTP or is there something existing that I've missed?

Service for scheduler

Hi

I'm just trying to figure how the tasks are run in production. Am I right in thinking that the scheduler must be running, and if i am using celery than a worker must be running as well?

I just plan on using mysql at the moment. Do you recommend creating a init script to start the scheduler?

Cheers

Scheduling frequency

Is there a way to schedule jobs/dags on specific time of the day? Also, on a related note, can DAGs be scheduled on some other frequency - such as hourly, weekly etc?

Apologies if I missed it in the documentation.

Issue 23 Explanation

I am re-reading the last sentence of your answer of this issue #23.

"If you're planning on doing some large distributed computation that uses lots of memory, Airflow might not be the right platform. Instead, you may want to use a distributed system, or write some sort of service, and invoke it from Airflow."

I am not sure to really understand. Could you give me more information about it?

How to execute arbitrary SQL in postgres?

I'd like to execute a series of SQL statements on Redshift.

I'd planned to use the SqlSensor to do this but failed to read the part of the documentation about executing a statement until a criteria is met. I just need to execute regardless of the result set.

Is there a way of doing this without using the PythonOperator and opening my own postgres connection? It looks as if I want a MySqlOperator for postgres.

Memory usage

I think is quite linked with one of my previous questions, but just to be sure. Will you look at the memory usage of each tasks and output an awesome graph or things like that?

That could be very cool, I did not see any scheduler with a good ui doing that.

Integrity error: Data Profiling -> Charts

So I've added an external connection successfully. A simple ad-hoc query returns data as expected.

However when I try to add a chart I get the following error:

Integrity error. (sqlite3.IntegrityError) NOT NULL constraint failed: chart.conn_id

It seems as if the sqlite job database isn't able to register the conn_id of my new external source?

I am able to see the source that I added "redshift" in the charting screen.

screenshot 2015-06-13 22 55 51

Operating system:

Distributor ID: Ubuntu
Description:    Ubuntu 14.04.1 LTS
Release:        14.04
Codename:       trusty

TimeSensor does not trigger

We are trying to kick off our pipeline by a TimeSensor, but it does not trigger. There is no error in the logs visible - the graph looks like this:
Screenshot

And the code like this:

from airflow import DAG
from airflow.operators import DummyOperator, MySqlOperator, BashOperator
from datetime import datetime, time, timedelta
from airflow.models import Variable
from airflow.operators.sensors import TimeSensor
from airflow.operators.sensors import SqlSensor

default_args = {
    'owner': 'deploy',
    'depends_on_past': False,
    'start_date': datetime(2015,06,24),
    'queue': 'default',
}

# the DAG
dag = DAG('pvi', default_args=default_args)

# tables
app_installs = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_INSTALLS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_installs', dag=dag)
app_logins = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_LOGINS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_logins', dag=dag)

# time dependencies
midnight_sensor = TimeSensor(target_time=time(15, 28), task_id='midnight', dag=dag)
start_aggregation = DummyOperator(task_id='start_aggregation', dag=dag)
start_aggregation.set_upstream(midnight_sensor)
start_aggregation.set_downstream([app_logins, app_installs])

We tried to trigger the chain by putting the target_time onto something 5 minutes in the future, the DAG gets refreshed every 60 seconds (default) but nothing happens. Worker, Scheduler, Flower and Webserver are on the same machine running Python 2.7 and FreeBSD 9.3 and we have a second machine running a celery worker which is listening on a different queue (q2). We have another DAG that works as expected on q2.

Run a DAG from command line

Is there a way to run a DAG from command line?

I see two options:

  1. Use backfill and give same start and end date.
  2. Use run - possible only if task_id is optional. Currently, run seems to support only running a specific task within a specific DAG. Is it possible to make task_id optional, so that if it's not provide, the run command will run the full DAG?

Is there any other way of doing this that I may have missed in the documentation?

Disabling backfill functionality?

If I have a process that needs to run daily regardless of input parameters, for example a process that bulk loads a table and doesn't depend on delta data, is there a way of preventing backfills running a dag multiple times?

If I have a dag with for arguments sake, one task, can I make a backfill with a weeks' date range only run the task once rather than 7 times?

EmailOperator is not working on Mac with Postfix : expecting TLS

Why do I need TLS? This requires setting it up for any smtp server, even one I run locally.

2015-06-19 13:20:09,793 - root - INFO - Executing <Task(EmailOperator): email_pipeline_start> for 2010-10-10 00:00:00
2015-06-19 13:20:10,459 - root - ERROR - STARTTLS extension not supported by server.
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 746, in run
    task_copy.execute(context=self.get_template_context())
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/email_operator.py", line 36, in execute
    send_email(self.to, self.subject, self.html_content)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 339, in send_email
    send_MIME_email(SMTP_MAIL_FROM, to, msg)
  File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 349, in send_MIME_email
    s.starttls()
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/smtplib.py", line 644, in starttls
    raise SMTPException("STARTTLS extension not supported by server.")
SMTPException: STARTTLS extension not supported by server.
2015-06-19 13:20:10,503 - root - ERROR - Failed to send email to: ['[email protected]']
2015-06-19 13:20:10,503 - root - ERROR - STARTTLS extension not supported by server.
2015-06-19 13:20:10,503 - root - ERROR - STARTTLS extension not supported by server.
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 195, in test
    ti.run(force=True, ignore_dependencies=True, test_mode=True)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 749, in run
    raise e
smtplib.SMTPException: STARTTLS extension not supported by server.

I also tested this out with our production postfix server, and that didn't work for a different reason, which is not apparent yet. Which email SMTP servers have you tried this out with? Examples would be very helpful.

Add `only_run_latest` to BaseOperator

Dependencies on an only_run_latest task are satisfied when that task has succeeded at any point in the future of the task depending on it.

When backfilling, only the end_date of only_run_latest tasks would be triggered, and dependencies for this task through time would be satisfied upon completion of this latest task instance.

When it comes to an only_run_latest task, the scheduler should skip runs to max(execution_date) of all of its dependencies.

New Dags are only imported when the webserver restarts!

How does one add new dags to the system without restarting the webserver? Running "python my_dag.py" is not importing the dag in to the db either. I am running on EC2 with a Postgresql database. I am able to see the data gets imported only on restart of the web app.

Here is the code:

This code will become the EP Data Pipeline.

The flow may eventually look like : 
* 1. SQS Message Written Detect
* 2.a. Email Send Flow Started 
* 2.b. Database Row Written Detect
* 3. SQS Queue Empty Detect
* 4. Email Send Flow Complete 
"""
from airflow import DAG
from airflow.operators import EmailOperator
from datetime import datetime


print ' got here 1'
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 1, 1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
}

print ' got here 1'
dag = DAG('ep_demo_1', default_args=default_args)

print ' got here 1'
t1 = EmailOperator(
    task_id='email_pipeline_start',
    to='[email protected]',
    subject='EP Demo Pipeline Started (TAD)',
    html_content='',
    dag=dag)

After restarting the webapp, I still don't see any entries in the dag table in the sqlite db for my dags. I see the dags in the UI -- there seems to be a difference between loading and importing a dag.

2015-06-18 17:49:21,175 - root - INFO - Loaded DAG <DAG: ep_demo_1>
2015-06-18 17:49:21,177 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_bash_operator.py
2015-06-18 17:49:21,178 - root - INFO - Loaded DAG <DAG: example_bash_operator>
2015-06-18 17:49:21,179 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_python_operator.py
2015-06-18 17:49:21,180 - root - INFO - Loaded DAG <DAG: example_python_operator>
2015-06-18 17:49:21,181 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/tutorial.py
2015-06-18 17:49:21,182 - root - INFO - Loaded DAG <DAG: tutorial>

how to use variables ?

webserver presents a possibility to define variables but there is no documentation on how to use those variables (and I don't see any usage in code).

Is it a way to defined "constants" to use in templates/arguments? How can we use them?

example_dags/cron_replacement.py

I really like the concept of Airflow but I've been stumped trying to get a simple cron replacement example DAG working.

I took the example_bash_operator.py and changed args like so:

 args = {
    'owner': 'airflow',
    'start_date': datetime.now(),
    'schedule_interval': timedelta(minutes = 5)
}

I was expecting this DAG to start now and run every 5 minutes, but instead the scheduler runs the DAG in a loop, back-to-back.

I'd imagine migrating from cron to Airflow would be a pretty common use case - it would be awesome to see an example DAG for replacing e.g. a cron that runs every 6 hours.

airflow worker does not work

$ airflow worker
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 299, in worker
worker.run(**options)
File "/usr/local/lib/python2.7/site-packages/celery/bin/worker.py", line 212, in run
state_db=self.node_format(state_db, hostname), **kwargs
File "/usr/local/lib/python2.7/site-packages/celery/worker/init.py", line 100, in init
self.setup_instance(**self.prepare_args(**kwargs))
File "/usr/local/lib/python2.7/site-packages/celery/worker/init.py", line 124, in setup_instance
self._conninfo = self.app.connection()
File "/usr/local/lib/python2.7/site-packages/celery/app/base.py", line 385, in connection
'BROKER_CONNECTION_TIMEOUT', connect_timeout
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 165, in init
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/site-packages/kombu/transport/init.py", line 109, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/site-packages/kombu/transport/init.py", line 89, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: 'No such transport: '

depends_on_past at DAG level

I have a DAG which looks like the following:
t1 -> t2

When I run a backfill for this DAG (say for day1 and day2) , I want to make sure that the DAG for day2 gets executed only after the DAG for day1 is complete. From the name, "depends_on_past" looked like the right thing to use. However, it works at the task level. In other words, t1 for day2 starts executing after t1 for day1 is complete before t2 for day1 is completed.

Is there a way to force a similar "depends_on_past" at the DAG level?

Using templates in PythonOperator

Sorry for a trivial question, I've been trawling thru the code but cant seem to get templating to work within PythonOperators.

I've been using the templating features of the PostgresOperator to read an external sql file and template some of the logic. I'd like to do similar using a PythonOperator, specifically:

  • Call a python function supplying a dict (via a PythonOperator)
  • The python function should load a template and render it using the supplied dict as variables.

Is this possible? I've been fiddling trying to supply a template_fields value to my function hoping it will magically work.

Right way to shutdown the scheduler and webserver

I'm integrating Airflow with monit.

Webserver
When you type airflow webserver -d -p 8080, 2 webserver processes are launched, one is a child of the other.

deploy   19005     1  0 00:37 ?        00:00:01 /usr/bin/python /usr/local/bin/airflow webserver -d -p 808 
deploy   19010 19005  1 00:37 ?        00:00:17 /usr/bin/python /usr/local/bin/airflow webserver -d -p 808 

In order to correctly kill a process tree with 2 levels (parent and child), I use the "stop program" command in the monit conf.d file below :

check process airflow-webserver with pidfile /home/deploy/airflow/pids/airflow-webserver.pid
  group airflow-webserver
  start program "/bin/sh -c '( TMP=/data/tmp PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin airflow webserver -d -p 8080 2>&1 & echo $! > /home/deploy/airflow/pids/airflow-webserver.pid ) | logger -p user.info'"
    as uid deploy and gid deploy
  stop program "/bin/sh -c 'PATH=/bin:/sbin:/usr/bin:/usr/sbin pkill -TERM -P `cat /home/deploy/airflow/pids/airflow-webserver.pid` && rm -f /home/deploy/airflow/pids/airflow-webserver.pid'"
    as uid deploy and gid deploy

Ideally, airflow should clean up its own child processes when it receives a SIGTERM. But, since it doesn't, I need to call pkill with the -P option.

Scheduler
The Scheduler also launches a bunch of processes. However, for some reason, using the same approach, I always find the parent surviving.

check process airflow-scheduler with pidfile /home/deploy/airflow/pids/airflow-scheduler.pid
  group airflow-scheduler
  start program "/bin/sh -c '( TMP=/data/tmp PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin airflow scheduler 2>&1 & echo $! > /home/deploy/airflow/pids/airflow-scheduler.pid ) | logger -p user.info'"
    as uid deploy and gid deploy
  stop program "/bin/sh -c 'PATH=/bin:/sbin:/usr/bin:/usr/sbin pkill -KILL -P `cat /home/deploy/airflow/pids/airflow-scheduler.pid` && rm -f /home/deploy/airflow/pids/airflow-scheduler.pid'"
    as uid deploy and gid deploy

Can you provide guidance on a clean way to shutdown the scheduler and webserver?

-s

Support for Amazon Elastic Container Service?

Just wondering if there's any plan to support Amazon ECS? I saw some discussion about potentially using docker containers for memory management. ECS would be a nice way to abstract away other details like selecting which server should run a given task.

Security question

I want to use s3 and I see that airflow works well with aws (ec2, s3). But s3 mean that I will have data outside. What do you have for security in airflow?

Tutorial documentation: "date"?

Just following the tutorial to see what airflow is about. It was easy to follow up till the "testing" paragraph on the tutorial page. Suddenly a "date" parameter is required.

Until there, I assumed you could run a batch job on anything and just request it to be run. Here, airflow implicitly requires that you run any DAG for a date or date ranges and that this parameter is required. I'm not sure if this parameter is passed into every task, or whether airflow simply needs this for another purpose.

It would be good to explain what this strange date parameter is for, what impact it has on tasks or whether this is internal use.

The query stuck for hive2

I used a simple query for a hive2 connection. It have been sucked. I cannot get the result ;(

2015-06-19 10:20:56,647 - tornado.access - INFO - 200 GET /admin/queryview/ (10.196.11.238) 19.21ms
2015-06-19 10:21:02,962 - root - INFO - Using connection to: localhost
2015-06-19 10:21:02,965 - root - INFO - Using connection to: localhost

Why do we need a CeleryExecutor in order to use the UI features?

It seems the start up costs of this are a bit high. I am trying out Luigi and Airflow, and need to set up a central scheduler for both> The UI features for Airflow are more attractive than those of Luigi, but the UI features in Airflow are only usable (actionable) if Celery is installed. I am running on EC2 and don't want to install too much to just take this for a spin. My recommendation is to make the UI features work for the LocalExecutor. Otherwise, Luigi seems attractive -- the startup cost is similar but it is more battletested

-s

How do you log print statements?

I'm running a few scripts via PythonOperator and was wondering how I can log print statements within these scripts. Any information would be appreciated. Thank you,

Kevin

New feature: HTTP interface?

So I went through the codebase and docs of 'airflow' today and I think it's a great fit for one of my projects. I'm the maintainer of "remap", which is a 100% python implementation of MapReduce only intended to run on a dozen of nodes for now. Jobs are kicked off through a REST interface.

Here's where a potential contribution comes in.

I didn't find anything already done with http interfaces, so my idea is to write an HTTPHook, an HTTP operator and a sensor for this. The operator/hook calls a URL resource with an indicated method and potentially some post data. The sensor later on calls another URL to check on progress. This would allow work to be executed asynchronously until some later checkpoint where the sensor needs to check whether something is available or not.

The HTTP library I intend to use is "requests", which should come installed with "pip": http://docs.python-requests.org/en/latest/

Operators never seem to return values and probably by design, which means that a worker process waits around for the job to complete, which means that an operator executes the action synchronously.

So, couple of questions:

  1. Is this a welcome contribution? Is someone already working on an HTTP hook?
  2. Can operators return data that is checked for later with a sensor to support inherently asynchronous systems like this one? Or are operators required to be synchronous in the sense that it always waits for the result of an operation, so must keep checking inside the operator at intervals until the action completes?
  3. Is the "requests" library an acceptable library (considering license, availability, installability, etc)?

As extra thought on 2, it's possible that external systems contain key/values that are useful to use in workflows. Is there a recommended mechanism of loading small pieces of data into a DAG workflow so that it's available from a context for example when another task is executed?

Inter-dag deps

I'm trying to use the ExternalTaskSensor to execute a data processing dag upon the completion of another dag that builds some lookup tables.

My child dag's demo code is below:

random_dep = ExternalTaskSensor(
    external_dag_id='driver',
    external_task_id='templated',
    owner='crm',
    task_id='random_dep'
    )

run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)
run_this.set_upstream(random_dep)

task = PythonOperator(
    task_id='sleep_for_1',
    python_callable=my_sleeping_function,
    op_kwargs={'random_base': 1},
    dag=dag)

task.set_upstream(run_this)

Have a correctly used the ExternalTaskSensor? Ive noticed this dag isnt rendered on the "Graph View" of the frontend.

'ascii' codec can't decode byte

Running through the Quickstart page right about the line that says:

pip install airflow

I get the following error:

Command python setup.py egg_info failed with error code 1 in /tmp/pip_build_hugo/pandas
Traceback (most recent call last):
  File "/usr/bin/pip", line 9, in <module>
    load_entry_point('pip==1.5.4', 'console_scripts', 'pip')()
  File "/usr/lib/python2.7/dist-packages/pip/__init__.py", line 235, in main
    return command.main(cmd_args)
  File "/usr/lib/python2.7/dist-packages/pip/basecommand.py", line 161, in main
    text = '\n'.join(complete_log)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 72: ordinal not in range(128)

It happens both on ubuntu 14.04 and Centos 7 at least...

I need a flow to start with a sensor

I have a flow that is linear and sequential as shown below :

  1. new data arrives in SQS
  2. wait for that data to start being written to postgres
  3. wait for the SQS queue to drain
  4. send an email that the flow completed

Eventually, we will have a pre-cursor step (e.g. step 0.) that kicks off a Spark job before waiting for new data in the SQS queue. For now, I do not have that, so I would like my flow to simply reset to the first step. In some ways, I am asking for a circular flow that never ends. This breaks the entire workflow abstraction. On another note, where is the "cron" functionality? Do I schedule airflow commands in cron outside of airflow?

Airflow Worker not Working When I run Backfill commands

I am using the CeleryExecutor with a Postgres results backend and queue. It looks like this fails when I run "airflow backfill" commands but works when I use the UI.

Sid-As-MBP-15:airflow siddharth$ airflow worker

 -------------- [email protected] v3.1.18 (Cipater)
---- **** ----- 
--- * ***  * -- Darwin-14.3.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x10c04e690
- ** ---------- .> transport:   sqla+postgresql://siddharth@localhost:5432/airflow_celery
- ** ---------- .> results:     db+postgresql://siddharth@localhost:5432/airflow_celery
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> default          exchange=celery(direct) key=celery


[2015-06-22 15:08:15,337: WARNING/MainProcess] [email protected] ready.
Starting flask
 * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 10, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 97, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2444, in first
    ret = list(self[0:1])
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2280, in __getitem__
    return list(res)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 84, in instances
    util.raise_from_cause(err)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 69, in instances
    rows = [proc(row) for row in fetch]
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 426, in _instance
    loaded_instance, populate_existing, populators)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 484, in _populate_full
    dict_[key] = getter(row)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
    return loads(value)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 209, in loads
    return load(file)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 199, in load
    obj = pik.load()
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 278, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named python_operator
[2015-06-22 15:11:08,787: ERROR/Worker-14] 1
[2015-06-22 15:11:08,835: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[4a012cf5-83f6-4d54-af34-b2ce74b517db] raised unexpected: Exception('Celery command failed',)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 41, in execute_command
    raise Exception('Celery command failed')
Exception: Celery command failed

My airflow.cfg file has the following settings for celery


# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentaly
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = sqla+postgresql://siddharth@localhost:5432/airflow_celery

# Another key Celery setting
#celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+postgresql://siddharth@localhost:5432/airflow_celery

I am getting this error with the example_python_operator and example_bash_operator flows. I haven't tried others.

airflow flower is not much help for experimental brokers like Postgres (over SQLAlchemy) since many commands are unsupported as mentioned in the Limitations section of this

Sid-As-MacBook-Pro-15:airflow siddharth$ airflow flower
[I 150622 15:05:54 command:114] Visit me at http://localhost:5555
[I 150622 15:05:55 command:116] Broker: sqla+postgresql://siddharth@localhost:5432/airflow_celery
[I 150622 15:05:55 command:119] Registered tasks:
    ['celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 150622 15:05:55 mixins:225] Connected to sqla+postgresql://siddharth@localhost:5432/airflow_celery
[W 150622 15:05:58 control:43] 'stats' inspect method failed
[W 150622 15:05:58 control:43] 'active_queues' inspect method failed
[W 150622 15:05:58 control:43] 'registered' inspect method failed
[W 150622 15:05:58 control:43] 'scheduled' inspect method failed
[W 150622 15:05:58 control:43] 'active' inspect method failed
[W 150622 15:05:58 control:43] 'reserved' inspect method failed
[W 150622 15:05:58 control:43] 'revoked' inspect method failed
[W 150622 15:05:58 control:43] 'conf' inspect method failed

If I look into my database (airflow_celery), I find 2 tables : kombu_message and kombu_queue.

In kombu_queue, I'm expecting a single queue called default, but I see several

airflow_celery=# select * from kombu_queue ;
 id |                           name                           
----+----------------------------------------------------------
  1 | default
  2 | [email protected]
  3 | celeryev.ad9d88a9-05b7-4d01-ad2c-8f95662787a0
  4 | 37bb18ec-a47f-349b-859f-3b454d2602a3.reply.celery.pidbox
  5 | b15e35ef-8873-31d6-8841-af833b9cec3d.reply.celery.pidbox
  6 | 4572302e-65a9-3ec1-850e-aff27cd59a9b.reply.celery.pidbox
  7 | 33d4dcc5-f997-3b7c-9f41-66d9a2c51159.reply.celery.pidbox

Can't add external executors -> Plugin manager

Would be nice to be able to add other executors "out of airflow codebase".

List of executors is hard coded in airflow/executors/init.py

A kinda plugin mechanism could allow to add other executors.

Issue using the PostGresOperator

Specifically, what is expected with respect to the "conn_id" argument?

The code below successfully connects to Postgres. I then pass the db_conn object to the PostgresOperator and get an exception shown at the bottom

db_conn_string = "host='localhost' dbname='cousteau_dev' user='siddharth' password='secret'"
db_conn = psycopg2.connect(db_conn_string)
print "Successfully Connected to database\n    ->%s and conn=%s" % (db_conn_string, db_conn)

wait_for_new_data_in_db = PostgresOperator(
    task_id='wait_for_new_data_in_db',
    postgres_conn_id=db_conn,
    sql='select count(*) from receiver_domain_aggregate;',
    dag=dag)
wait_for_new_data_in_db.set_upstream(wait_for_first_sqs_message)

Here's the exception

2015-06-21 20:21:06,789 - root - INFO - Executing <Task(PostgresOperator): wait_for_new_data_in_db> for 2015-06-12 00:00:00
2015-06-21 20:21:06,801 - root - INFO - Executing: select count(*) from receiver_domain_aggregate;
2015-06-21 20:21:06,805 - root - ERROR - (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 746, in run
    task_copy.execute(context=self.get_template_context())
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/postgres_operator.py", line 36, in execute
    self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
  File "/usr/local/lib/python2.7/site-packages/airflow/hooks/postgres_hook.py", line 25, in __init__
    if db.count() == 0:
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2734, in count
    return self.from_self(col).scalar()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2503, in scalar
    ret = self.one()
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2472, in one
    ret = list(self)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2515, in __iter__
    return self._execute_and_instances(context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2530, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
    context)
  File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute
    cursor.execute(statement, parameters)
ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]

Since I can connect using psycopg2, I'm going to revert to using a PythonOperator. My flow is entirely made up of PythonOperators. I found it easier to use.

How Do I Execute Conditional Branching?

Let's say that I have 2 paths with a branching point.

a --> b1 --> c --> e --> f --> g
a --> b2 --> d

So, starting at "a", based on some condition, I either follow b1 --> c --> e --> f --> g or b2-->d. In my specific case, if there is data generated from stage "a", I expect to go through full processing of the data c --> e --> f --> g. If I don't have data, then I expect to execute "d", which simply emails that there is no new data to process today!

If the entire flow were executed in a single subprocess within a single python interpreter, I could set a global boolean (tsk tsk, I know) that all operators check in order to decide whether to "do something" or not. Essentially, I would be placing the conditional logic in all operators downstream of "b". I am using the LocalExecutor right now, but don't expect that this assumption will hold.

So, I would need to store the global in a database and have all downstream operators check that boolean. Basically, I would rather just have a "branch operator" instead, so that I don't need to do this! In my flow, "b' is the branch operator, with "b1" and "b2" as branches.

In the absence of a conditional operator, I am considering the following:
If there is no data, "c" will raise an error so that it is marked red and the downstream operators would not be executed. On the other branch, "d" would pass, sending an email that there is no new data.

If there is new data, then the opposite could happen.

My approach above will achieve what I need by showing one branch as having one "red" or "failed" task (i.e. at "c" or "d") and everything downstream as not being executed. That seems to meet my need

Documentation guidelines

We'd like to use comments within our dags to generate job specific documentation.

This will enable us to version control, maintain and easily produce maintainable and understandable DAGs without the overheads of writing separate documents.

This seems fairly straight forward using docstring conventions. However, I'd like to provide useful comments that explain the logic within each task within a DAG. Is there a way to use sphinx (or something similar) to generate documentation? It would be brilliant if we could produce html/pdf/any format documentation that detailed each task and its dependencies/inputs etc..

Gantt Chart Bug

Here is my flow.
screenshot 2015-06-25 17 28 59

I'm noticing something wrong in the gantt chart.
screenshot 2015-06-25 17 28 02

The order of the first 2 stages in my gantt chart does not reflect my stage dependencies.
So I run "submit_spark_job" followed by "wait_for_first_sqs_message". Why is it then that the gantt chart shows them out of order?

My code is below:

from airflow import DAG
from airflow.operators import PythonOperator, PostgresOperator, BashOperator
from datetime import date, datetime, time, timedelta
from pprint import pprint

import base64
import boto.sqs
import boto.sns
import json
import logging
import psycopg2
import sys
import time

# CONSTANTS
SQS_QUEUE_NAME='*******'
REGION = 'us-west-2'
DB_CHECK_QUERY='SELECT count(*) from message;'
SNS_TOPIC_ARN_FOR_JOB_STATUS='*******'
EP_DB_CONNECT_STRING = "*******"


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 22),
    'schedule_interval': timedelta(days=1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 3,
    'retry_interval': timedelta(seconds=5)
}
dag = DAG('ep_demo', default_args=default_args)

def _wait_for_first_sqs_message_internal(ds, **kwargs):
    conn = boto.sqs.connect_to_region(REGION)
    q = conn.get_queue(SQS_QUEUE_NAME)
    rs = q.get_messages()
    while rs is None or len(rs) <= 0:
        logging.info("----- WAITING FOR A MESSAGE on SQS Queue : {}".format(SQS_QUEUE_NAME))
        time.sleep(10)
        rs = q.get_messages()
    logging.info("----- FOUND A MESSAGE on SQS Queue : {}".format(SQS_QUEUE_NAME))
    return True

def _wait_for_empty_queue_internal(ds, **kwargs):
    conn = boto.sqs.connect_to_region(REGION)
    q = conn.get_queue(SQS_QUEUE_NAME)
    q_size = q.count()
    while q_size > 0:
        logging.info("----- WAITING FOR Queue {} to Drain".format(SQS_QUEUE_NAME))
        q = conn.get_queue(SQS_QUEUE_NAME)
        q_size = q.count()
        time.sleep(10)
    logging.info("----- Queue {} Empty".format(SQS_QUEUE_NAME))
    return True

# TODO : pointing to local db, parameterize this
def _wait_for_new_data_in_db_internal(ds, **kwargs):
    db_conn = psycopg2.connect(EP_DB_CONNECT_STRING)
    logging.info("----- Successfully Connected to database {}".format(EP_DB_CONNECT_STRING))
    cursor = db_conn.cursor()

    # Establish a base line
    cursor.execute(DB_CHECK_QUERY)
    result = cursor.fetchone()
    initial_row_count = int(result[0])
    found_new_data = None

    # Detect a change in the record count
    while not found_new_data:
        logging.info("----- WAITING FOR A NEW DATA in DB : base count = {}".format(initial_row_count))
        time.sleep(10)
        cursor.execute(DB_CHECK_QUERY)
        result = cursor.fetchone()
        new_row_count = int(result[0])
        if(new_row_count > initial_row_count):
            found_new_data = True
    logging.info("----- FOUND A NEW DATA in DB : (base count ==> new count) = ({} ==> {})".format(new_row_count, initial_row_count))
    return True


def _send_sns_notification_internal(ds, **kwargs):
    conn = boto.sns.connect_to_region(REGION)
    conn.publish(message='Airflow EP Data pipeline flow completed successfully', subject='Airflow Flow Complete', target_arn=SNS_TOPIC_ARN_FOR_JOB_STATUS,
                )
    logging.info("----- Sent SNS about FLOW Completion")
    return True

# Operator 0 : Start a spark job
#submit_spark_job = BashOperator(
#    task_id='submit_spark_job',
#    bash_command="ssh -i ~/.ssh/id_rsa_spark root@*******'. /root/.bash_profile; cd $SPARK_HOME/bin; ~/spark/bin/spark-submit --py-files risk.py,volume_dict.py,domain_suffix.py,sender_models.py,avro_tools.py aggregate.py --avro --input uploads/1/db80b4a2-fda4-11e4-9f6c-062f18616d35/ --s3 --domains --bucket agari-collector-ingest-avro'",
#    dag=dag)

submit_spark_job = BashOperator(
    task_id='submit_spark_job',
    bash_command="ssh -i ~/.ssh/id_rsa_spark root@*******'. /root/.bash_profile; ~/run_ingest.sh'",
    dag=dag)

# Operator 1 : Check a SQS queue for messages - currently a dummy!
wait_for_first_sqs_message = PythonOperator(
    task_id='wait_for_first_sqs_message',
    provide_context=True,
    python_callable=_wait_for_first_sqs_message_internal,
    dag=dag)
wait_for_first_sqs_message.set_upstream(submit_spark_job)

# Operator 2.a : Check for new data in DB
wait_for_new_data_in_db = PythonOperator(
    task_id='wait_for_new_data_in_db',
    provide_context=True,
    python_callable=_wait_for_new_data_in_db_internal,
    dag=dag)
wait_for_new_data_in_db.set_upstream(wait_for_first_sqs_message)


# Operator 3 : Check for SQS Queue to be drained
wait_for_empty_queue = PythonOperator(
    task_id='wait_for_empty_queue',
    provide_context=True,
    python_callable=_wait_for_empty_queue_internal,
    dag=dag)
wait_for_empty_queue.set_upstream(wait_for_new_data_in_db)

# Operator 4b : Send SNS when flow completes successfully
send_sns_notification = PythonOperator(
    task_id='send_sns_notification',
    provide_context=True,
    python_callable=_send_sns_notification_internal,
    dag=dag)
send_sns_notification.set_upstream(wait_for_empty_queue)

Configuration file issue, plugin folder key name not specified in configuration file.

System details:

Description: Ubuntu 15.04
Linux my-pc 3.19.0-21-generic #21-Ubuntu SMP Sun Jun 14 18:31:11 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

Steps to reproduce:

I cloned the repo and tried the quick start steps : http://pythonhosted.org/airflow/start.html

got error at the following step:

$ airflow initdb
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 3, in
from airflow.bin.cli import get_parser
File "/usr/local/lib/python2.7/dist-packages/airflow/init.py", line 10, in
from airflow.configuration import conf
File "/usr/local/lib/python2.7/dist-packages/airflow/configuration.py", line 276, in
conf.read(AIRFLOW_CONFIG)
File "/usr/lib/python2.7/ConfigParser.py", line 305, in read
self._read(fp, filename)
File "/usr/lib/python2.7/ConfigParser.py", line 546, in _read
raise e
ConfigParser.ParsingError: File contains parsing errors: /home/viki/airflow/airflow.cfg
[line 32]: '/home/viki/airflow/plugins\n'

The error seems to be because the configuration file is like this

Where your Airflow plugins are stored

/home/viki/airflow/plugins

i changed it to

Where your Airflow plugins are stored

plugins_folder = /home/viki/airflow/plugins

and it worked.

Question about TaskInstance.is_queueable

Would you mind clarifying the following condition in TaskInstance.is_queueable:

if self.execution_date > datetime.now() - self.task.schedule_interval:

I would have expected:

if self.execution_date > datetime.now():

Consider the use case of having a daily task. Let's suppose the task ran early this morning for 2015-07-01. However, the task failed for some reason. After fixing the issue, I go to backfill using a command such as:

airflow backfill -s 2015-07-01 -e 2015-07-01 dag_id

As the code is currently written, the backfill will not execute, due to the conditional that I mention above. I could hack it to work by passing in yesterday's date to backfill and then using "{{ tomorrow_ds }}" in my tasks, but this seems like it shouldn't be necessary.

If you're okay with the above change, I'm happy to submit a PR. If I should be taking a different approach, please let me know. Thanks!

Understanding interactions between DAG scheduling and sensors

Let's say I have a DAG scheduled with a one-hour schedule interval, and the first step in that DAG is a sensor that pokes once a minute.

What happens if the sensor never triggers during the hour after the first DAG "submission"? What state are tasks in the first DAG left in? What if the sensor's triggering event does occur at some point after the second DAG is scheduled? Does that "release" the remainder of both DAGs?

Or maybe the example I propose just doesn't make sense as a way to set things up. If so, could you explain why?

I'm sure this also somehow interacts with depends_on_past, but I feel like I'd be reaching too far to try and guess how. (This might be a candidate for documentation expansion.)

Airflow Tutorial page inconsistencies

I noticed some inconsistencies in your Tutorial page where you walk us through the DAG. For instance, in the code block, you have:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_interval': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'schedule_interval': timedelta(1),
    # 'end_date': datetime(2016, 1, 1),
}

But when you break it down, it becomes:

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 1, 1),
    'email': ['[email protected]',],
    'email_on_failure': True,
    'email_on_retry': True,
}

And again,

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=dag)

which becomes:

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    email_on_failure=False,
    bash_command='sleep 5',
    dag=dag)

I am less concerned with the fact that you changed the code to illustrate the arguments and parameters and more concerned that you say: "Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below." leading us to believe that it will be a true reproduction, which it is not. Please fix the code in the tutorial so that it is clearer. Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.