apache / airflow Goto Github PK
View Code? Open in Web Editor NEWApache Airflow - A platform to programmatically author, schedule, and monitor workflows
Home Page: https://airflow.apache.org/
License: Apache License 2.0
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
Home Page: https://airflow.apache.org/
License: Apache License 2.0
I see airflow.operators.ExternalTaskSensor
supports dependency on an ask in another DAG. But is it possible to set dependency on another DAG itself? In other words, can external_task_id
parameter be optional?
Do you have plans to support FTP hooks?
I've noticed there's an FTP connection type in the front-end, but I've not found a component to utilise the connection. Is this a case of me extending BaseHook to wrap some functionality to pull and push files via FTP or is there something existing that I've missed?
Hi
I'm just trying to figure how the tasks are run in production. Am I right in thinking that the scheduler must be running, and if i am using celery than a worker must be running as well?
I just plan on using mysql at the moment. Do you recommend creating a init script to start the scheduler?
Cheers
Is there a way to schedule jobs/dags on specific time of the day? Also, on a related note, can DAGs be scheduled on some other frequency - such as hourly, weekly etc?
Apologies if I missed it in the documentation.
For example, the Luigi example walks through a case that involves importing data into a DB. It would be cool if there were some examples that read from one location (e.g. S3) and wrote to another (e.g. DB).
I am re-reading the last sentence of your answer of this issue #23.
"If you're planning on doing some large distributed computation that uses lots of memory, Airflow might not be the right platform. Instead, you may want to use a distributed system, or write some sort of service, and invoke it from Airflow."
I am not sure to really understand. Could you give me more information about it?
I'd like to execute a series of SQL statements on Redshift.
I'd planned to use the SqlSensor to do this but failed to read the part of the documentation about executing a statement until a criteria is met. I just need to execute regardless of the result set.
Is there a way of doing this without using the PythonOperator and opening my own postgres connection? It looks as if I want a MySqlOperator for postgres.
I think is quite linked with one of my previous questions, but just to be sure. Will you look at the memory usage of each tasks and output an awesome graph or things like that?
That could be very cool, I did not see any scheduler with a good ui doing that.
So I've added an external connection successfully. A simple ad-hoc query returns data as expected.
However when I try to add a chart I get the following error:
Integrity error. (sqlite3.IntegrityError) NOT NULL constraint failed: chart.conn_id
It seems as if the sqlite job database isn't able to register the conn_id
of my new external source?
I am able to see the source that I added "redshift" in the charting screen.
Operating system:
Distributor ID: Ubuntu
Description: Ubuntu 14.04.1 LTS
Release: 14.04
Codename: trusty
We distribute our configs via text files managed by puppet so it would be awkward to put that into the db for airflow. Would you accept a patch that looks for conn_id
in settings.conf
before checking the database?
We are trying to kick off our pipeline by a TimeSensor, but it does not trigger. There is no error in the logs visible - the graph looks like this:
And the code like this:
from airflow import DAG
from airflow.operators import DummyOperator, MySqlOperator, BashOperator
from datetime import datetime, time, timedelta
from airflow.models import Variable
from airflow.operators.sensors import TimeSensor
from airflow.operators.sensors import SqlSensor
default_args = {
'owner': 'deploy',
'depends_on_past': False,
'start_date': datetime(2015,06,24),
'queue': 'default',
}
# the DAG
dag = DAG('pvi', default_args=default_args)
# tables
app_installs = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_INSTALLS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_installs', dag=dag)
app_logins = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_LOGINS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_logins', dag=dag)
# time dependencies
midnight_sensor = TimeSensor(target_time=time(15, 28), task_id='midnight', dag=dag)
start_aggregation = DummyOperator(task_id='start_aggregation', dag=dag)
start_aggregation.set_upstream(midnight_sensor)
start_aggregation.set_downstream([app_logins, app_installs])
We tried to trigger the chain by putting the target_time onto something 5 minutes in the future, the DAG gets refreshed every 60 seconds (default) but nothing happens. Worker, Scheduler, Flower and Webserver are on the same machine running Python 2.7 and FreeBSD 9.3 and we have a second machine running a celery worker which is listening on a different queue (q2). We have another DAG that works as expected on q2.
Is there a way to run a DAG from command line?
I see two options:
backfill
and give same start and end date.run
- possible only if task_id
is optional. Currently, run
seems to support only running a specific task within a specific DAG. Is it possible to make task_id
optional, so that if it's not provide, the run
command will run the full DAG?Is there any other way of doing this that I may have missed in the documentation?
Hi,
I see in your documentation some subpackage (http://pythonhosted.org/airflow/installation.html#extra-packages). But I never heard about that with pip, moreover I don't see them using pip search airflow.
If I have a process that needs to run daily regardless of input parameters, for example a process that bulk loads a table and doesn't depend on delta data, is there a way of preventing backfills running a dag multiple times?
If I have a dag with for arguments sake, one task, can I make a backfill with a weeks' date range only run the task once rather than 7 times?
Why do I need TLS? This requires setting it up for any smtp server, even one I run locally.
2015-06-19 13:20:09,793 - root - INFO - Executing <Task(EmailOperator): email_pipeline_start> for 2010-10-10 00:00:00
2015-06-19 13:20:10,459 - root - ERROR - STARTTLS extension not supported by server.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 746, in run
task_copy.execute(context=self.get_template_context())
File "/usr/local/lib/python2.7/site-packages/airflow/operators/email_operator.py", line 36, in execute
send_email(self.to, self.subject, self.html_content)
File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 339, in send_email
send_MIME_email(SMTP_MAIL_FROM, to, msg)
File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 349, in send_MIME_email
s.starttls()
File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/smtplib.py", line 644, in starttls
raise SMTPException("STARTTLS extension not supported by server.")
SMTPException: STARTTLS extension not supported by server.
2015-06-19 13:20:10,503 - root - ERROR - Failed to send email to: ['[email protected]']
2015-06-19 13:20:10,503 - root - ERROR - STARTTLS extension not supported by server.
2015-06-19 13:20:10,503 - root - ERROR - STARTTLS extension not supported by server.
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 195, in test
ti.run(force=True, ignore_dependencies=True, test_mode=True)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 749, in run
raise e
smtplib.SMTPException: STARTTLS extension not supported by server.
I also tested this out with our production postfix server, and that didn't work for a different reason, which is not apparent yet. Which email SMTP servers have you tried this out with? Examples would be very helpful.
Dependencies on an only_run_latest
task are satisfied when that task has succeeded at any point in the future of the task depending on it.
When backfilling, only the end_date
of only_run_latest
tasks would be triggered, and dependencies for this task through time would be satisfied upon completion of this latest task instance.
When it comes to an only_run_latest
task, the scheduler should skip runs to max(execution_date)
of all of its dependencies.
How does one add new dags to the system without restarting the webserver? Running "python my_dag.py" is not importing the dag in to the db either. I am running on EC2 with a Postgresql database. I am able to see the data gets imported only on restart of the web app.
Here is the code:
This code will become the EP Data Pipeline.
The flow may eventually look like :
* 1. SQS Message Written Detect
* 2.a. Email Send Flow Started
* 2.b. Database Row Written Detect
* 3. SQS Queue Empty Detect
* 4. Email Send Flow Complete
"""
from airflow import DAG
from airflow.operators import EmailOperator
from datetime import datetime
print ' got here 1'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 1, 1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
}
print ' got here 1'
dag = DAG('ep_demo_1', default_args=default_args)
print ' got here 1'
t1 = EmailOperator(
task_id='email_pipeline_start',
to='[email protected]',
subject='EP Demo Pipeline Started (TAD)',
html_content='',
dag=dag)
After restarting the webapp, I still don't see any entries in the dag table in the sqlite db for my dags. I see the dags in the UI -- there seems to be a difference between loading and importing a dag.
2015-06-18 17:49:21,175 - root - INFO - Loaded DAG <DAG: ep_demo_1>
2015-06-18 17:49:21,177 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_bash_operator.py
2015-06-18 17:49:21,178 - root - INFO - Loaded DAG <DAG: example_bash_operator>
2015-06-18 17:49:21,179 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/example_python_operator.py
2015-06-18 17:49:21,180 - root - INFO - Loaded DAG <DAG: example_python_operator>
2015-06-18 17:49:21,181 - root - INFO - Importing /usr/local/lib/python2.7/site-packages/airflow/example_dags/tutorial.py
2015-06-18 17:49:21,182 - root - INFO - Loaded DAG <DAG: tutorial>
webserver presents a possibility to define variables but there is no documentation on how to use those variables (and I don't see any usage in code).
Is it a way to defined "constants" to use in templates/arguments? How can we use them?
I really like the concept of Airflow but I've been stumped trying to get a simple cron replacement example DAG working.
I took the example_bash_operator.py
and changed args
like so:
args = {
'owner': 'airflow',
'start_date': datetime.now(),
'schedule_interval': timedelta(minutes = 5)
}
I was expecting this DAG to start now and run every 5 minutes, but instead the scheduler runs the DAG in a loop, back-to-back.
I'd imagine migrating from cron to Airflow would be a pretty common use case - it would be awesome to see an example DAG for replacing e.g. a cron that runs every 6 hours.
I didn't any ui pictures about that.
While installing I noticed the url
and download_url
point to @mistercrunch's repo:
https://github.com/airbnb/airflow/blob/9a791999b871d5f19550cbf882fbd09037d99e52/setup.py#L69
url='https://github.com/mistercrunch/Airflow', download_url=( 'https://github.com/mistercrunch/Airflow/tarball/' + version),
Is that the correct repository to install and update from?
$ airflow worker
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 299, in worker
worker.run(**options)
File "/usr/local/lib/python2.7/site-packages/celery/bin/worker.py", line 212, in run
state_db=self.node_format(state_db, hostname), **kwargs
File "/usr/local/lib/python2.7/site-packages/celery/worker/init.py", line 100, in init
self.setup_instance(**self.prepare_args(**kwargs))
File "/usr/local/lib/python2.7/site-packages/celery/worker/init.py", line 124, in setup_instance
self._conninfo = self.app.connection()
File "/usr/local/lib/python2.7/site-packages/celery/app/base.py", line 385, in connection
'BROKER_CONNECTION_TIMEOUT', connect_timeout
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 165, in init
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/site-packages/kombu/transport/init.py", line 109, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/site-packages/kombu/transport/init.py", line 89, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: 'No such transport: '
I have a DAG which looks like the following:
t1 -> t2
When I run a backfill for this DAG (say for day1 and day2) , I want to make sure that the DAG for day2 gets executed only after the DAG for day1 is complete. From the name, "depends_on_past" looked like the right thing to use. However, it works at the task level. In other words, t1 for day2 starts executing after t1 for day1 is complete before t2 for day1 is completed.
Is there a way to force a similar "depends_on_past" at the DAG level?
I would add the following actions to the Sensor?
Sorry for a trivial question, I've been trawling thru the code but cant seem to get templating to work within PythonOperators.
I've been using the templating features of the PostgresOperator to read an external sql file and template some of the logic. I'd like to do similar using a PythonOperator, specifically:
Is this possible? I've been fiddling trying to supply a template_fields
value to my function hoping it will magically work.
I'm integrating Airflow with monit.
Webserver
When you type airflow webserver -d -p 8080
, 2 webserver processes are launched, one is a child of the other.
deploy 19005 1 0 00:37 ? 00:00:01 /usr/bin/python /usr/local/bin/airflow webserver -d -p 808
deploy 19010 19005 1 00:37 ? 00:00:17 /usr/bin/python /usr/local/bin/airflow webserver -d -p 808
In order to correctly kill a process tree with 2 levels (parent and child), I use the "stop program" command in the monit conf.d file below :
check process airflow-webserver with pidfile /home/deploy/airflow/pids/airflow-webserver.pid
group airflow-webserver
start program "/bin/sh -c '( TMP=/data/tmp PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin airflow webserver -d -p 8080 2>&1 & echo $! > /home/deploy/airflow/pids/airflow-webserver.pid ) | logger -p user.info'"
as uid deploy and gid deploy
stop program "/bin/sh -c 'PATH=/bin:/sbin:/usr/bin:/usr/sbin pkill -TERM -P `cat /home/deploy/airflow/pids/airflow-webserver.pid` && rm -f /home/deploy/airflow/pids/airflow-webserver.pid'"
as uid deploy and gid deploy
Ideally, airflow should clean up its own child processes when it receives a SIGTERM. But, since it doesn't, I need to call pkill
with the -P
option.
Scheduler
The Scheduler also launches a bunch of processes. However, for some reason, using the same approach, I always find the parent surviving.
check process airflow-scheduler with pidfile /home/deploy/airflow/pids/airflow-scheduler.pid
group airflow-scheduler
start program "/bin/sh -c '( TMP=/data/tmp PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin airflow scheduler 2>&1 & echo $! > /home/deploy/airflow/pids/airflow-scheduler.pid ) | logger -p user.info'"
as uid deploy and gid deploy
stop program "/bin/sh -c 'PATH=/bin:/sbin:/usr/bin:/usr/sbin pkill -KILL -P `cat /home/deploy/airflow/pids/airflow-scheduler.pid` && rm -f /home/deploy/airflow/pids/airflow-scheduler.pid'"
as uid deploy and gid deploy
Can you provide guidance on a clean way to shutdown the scheduler and webserver?
-s
Just wondering if there's any plan to support Amazon ECS? I saw some discussion about potentially using docker containers for memory management. ECS would be a nice way to abstract away other details like selecting which server should run a given task.
I want to use s3 and I see that airflow works well with aws (ec2, s3). But s3 mean that I will have data outside. What do you have for security in airflow?
Just following the tutorial to see what airflow is about. It was easy to follow up till the "testing" paragraph on the tutorial page. Suddenly a "date" parameter is required.
Until there, I assumed you could run a batch job on anything and just request it to be run. Here, airflow implicitly requires that you run any DAG for a date or date ranges and that this parameter is required. I'm not sure if this parameter is passed into every task, or whether airflow simply needs this for another purpose.
It would be good to explain what this strange date parameter is for, what impact it has on tasks or whether this is internal use.
I used a simple query for a hive2 connection. It have been sucked. I cannot get the result ;(
2015-06-19 10:20:56,647 - tornado.access - INFO - 200 GET /admin/queryview/ (10.196.11.238) 19.21ms
2015-06-19 10:21:02,962 - root - INFO - Using connection to: localhost
2015-06-19 10:21:02,965 - root - INFO - Using connection to: localhost
It seems the start up costs of this are a bit high. I am trying out Luigi and Airflow, and need to set up a central scheduler for both> The UI features for Airflow are more attractive than those of Luigi, but the UI features in Airflow are only usable (actionable) if Celery is installed. I am running on EC2 and don't want to install too much to just take this for a spin. My recommendation is to make the UI features work for the LocalExecutor. Otherwise, Luigi seems attractive -- the startup cost is similar but it is more battletested
-s
I'm running a few scripts via PythonOperator and was wondering how I can log print statements within these scripts. Any information would be appreciated. Thank you,
Kevin
So I went through the codebase and docs of 'airflow' today and I think it's a great fit for one of my projects. I'm the maintainer of "remap", which is a 100% python implementation of MapReduce only intended to run on a dozen of nodes for now. Jobs are kicked off through a REST interface.
Here's where a potential contribution comes in.
I didn't find anything already done with http interfaces, so my idea is to write an HTTPHook, an HTTP operator and a sensor for this. The operator/hook calls a URL resource with an indicated method and potentially some post data. The sensor later on calls another URL to check on progress. This would allow work to be executed asynchronously until some later checkpoint where the sensor needs to check whether something is available or not.
The HTTP library I intend to use is "requests", which should come installed with "pip": http://docs.python-requests.org/en/latest/
Operators never seem to return values and probably by design, which means that a worker process waits around for the job to complete, which means that an operator executes the action synchronously.
So, couple of questions:
As extra thought on 2, it's possible that external systems contain key/values that are useful to use in workflows. Is there a recommended mechanism of loading small pieces of data into a DAG workflow so that it's available from a context for example when another task is executed?
We don't use mysql, but there is a requirement to have the mysql client installed. We get the exception when starting the airflow worker.
I'm trying to use the ExternalTaskSensor to execute a data processing dag upon the completion of another dag that builds some lookup tables.
My child dag's demo code is below:
random_dep = ExternalTaskSensor(
external_dag_id='driver',
external_task_id='templated',
owner='crm',
task_id='random_dep'
)
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
run_this.set_upstream(random_dep)
task = PythonOperator(
task_id='sleep_for_1',
python_callable=my_sleeping_function,
op_kwargs={'random_base': 1},
dag=dag)
task.set_upstream(run_this)
Have a correctly used the ExternalTaskSensor? Ive noticed this dag isnt rendered on the "Graph View" of the frontend.
Running through the Quickstart page right about the line that says:
pip install airflow
I get the following error:
Command python setup.py egg_info failed with error code 1 in /tmp/pip_build_hugo/pandas
Traceback (most recent call last):
File "/usr/bin/pip", line 9, in <module>
load_entry_point('pip==1.5.4', 'console_scripts', 'pip')()
File "/usr/lib/python2.7/dist-packages/pip/__init__.py", line 235, in main
return command.main(cmd_args)
File "/usr/lib/python2.7/dist-packages/pip/basecommand.py", line 161, in main
text = '\n'.join(complete_log)
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe2 in position 72: ordinal not in range(128)
It happens both on ubuntu 14.04 and Centos 7 at least...
I have a flow that is linear and sequential as shown below :
Eventually, we will have a pre-cursor step (e.g. step 0.) that kicks off a Spark job before waiting for new data in the SQS queue. For now, I do not have that, so I would like my flow to simply reset to the first step. In some ways, I am asking for a circular flow that never ends. This breaks the entire workflow abstraction. On another note, where is the "cron" functionality? Do I schedule airflow commands in cron outside of airflow?
The setup.cfg mentions an MIT license but the LICENSE.txt states the project uses Apache 2.0.
I am using the CeleryExecutor with a Postgres results backend and queue. It looks like this fails when I run "airflow backfill" commands but works when I use the UI.
Sid-As-MBP-15:airflow siddharth$ airflow worker
-------------- [email protected] v3.1.18 (Cipater)
---- **** -----
--- * *** * -- Darwin-14.3.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x10c04e690
- ** ---------- .> transport: sqla+postgresql://siddharth@localhost:5432/airflow_celery
- ** ---------- .> results: db+postgresql://siddharth@localhost:5432/airflow_celery
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> default exchange=celery(direct) key=celery
[2015-06-22 15:08:15,337: WARNING/MainProcess] [email protected] ready.
Starting flask
* Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 10, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 97, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2444, in first
ret = list(self[0:1])
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2280, in __getitem__
return list(res)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 84, in instances
util.raise_from_cause(err)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 69, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 426, in _instance
loaded_instance, populate_existing, populators)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 484, in _populate_full
dict_[key] = getter(row)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1261, in process
return loads(value)
File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 209, in loads
return load(file)
File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 199, in load
obj = pik.load()
File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python2.7/site-packages/dill/dill.py", line 278, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/local/Cellar/python/2.7.9/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named python_operator
[2015-06-22 15:11:08,787: ERROR/Worker-14] 1
[2015-06-22 15:11:08,835: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[4a012cf5-83f6-4d54-af34-b2ce74b517db] raised unexpected: Exception('Celery command failed',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 41, in execute_command
raise Exception('Celery command failed')
Exception: Celery command failed
My airflow.cfg file has the following settings for celery
# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentaly
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
#broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
broker_url = sqla+postgresql://siddharth@localhost:5432/airflow_celery
# Another key Celery setting
#celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+postgresql://siddharth@localhost:5432/airflow_celery
I am getting this error with the example_python_operator and example_bash_operator flows. I haven't tried others.
airflow flower
is not much help for experimental brokers like Postgres (over SQLAlchemy) since many commands are unsupported as mentioned in the Limitations section of this
Sid-As-MacBook-Pro-15:airflow siddharth$ airflow flower
[I 150622 15:05:54 command:114] Visit me at http://localhost:5555
[I 150622 15:05:55 command:116] Broker: sqla+postgresql://siddharth@localhost:5432/airflow_celery
[I 150622 15:05:55 command:119] Registered tasks:
['celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 150622 15:05:55 mixins:225] Connected to sqla+postgresql://siddharth@localhost:5432/airflow_celery
[W 150622 15:05:58 control:43] 'stats' inspect method failed
[W 150622 15:05:58 control:43] 'active_queues' inspect method failed
[W 150622 15:05:58 control:43] 'registered' inspect method failed
[W 150622 15:05:58 control:43] 'scheduled' inspect method failed
[W 150622 15:05:58 control:43] 'active' inspect method failed
[W 150622 15:05:58 control:43] 'reserved' inspect method failed
[W 150622 15:05:58 control:43] 'revoked' inspect method failed
[W 150622 15:05:58 control:43] 'conf' inspect method failed
If I look into my database (airflow_celery), I find 2 tables : kombu_message and kombu_queue.
In kombu_queue, I'm expecting a single queue called default, but I see several
airflow_celery=# select * from kombu_queue ;
id | name
----+----------------------------------------------------------
1 | default
2 | [email protected]
3 | celeryev.ad9d88a9-05b7-4d01-ad2c-8f95662787a0
4 | 37bb18ec-a47f-349b-859f-3b454d2602a3.reply.celery.pidbox
5 | b15e35ef-8873-31d6-8841-af833b9cec3d.reply.celery.pidbox
6 | 4572302e-65a9-3ec1-850e-aff27cd59a9b.reply.celery.pidbox
7 | 33d4dcc5-f997-3b7c-9f41-66d9a2c51159.reply.celery.pidbox
Would be nice to be able to add other executors "out of airflow codebase".
List of executors is hard coded in airflow/executors/init.py
A kinda plugin mechanism could allow to add other executors.
Specifically, what is expected with respect to the "conn_id" argument?
The code below successfully connects to Postgres. I then pass the db_conn object to the PostgresOperator and get an exception shown at the bottom
db_conn_string = "host='localhost' dbname='cousteau_dev' user='siddharth' password='secret'"
db_conn = psycopg2.connect(db_conn_string)
print "Successfully Connected to database\n ->%s and conn=%s" % (db_conn_string, db_conn)
wait_for_new_data_in_db = PostgresOperator(
task_id='wait_for_new_data_in_db',
postgres_conn_id=db_conn,
sql='select count(*) from receiver_domain_aggregate;',
dag=dag)
wait_for_new_data_in_db.set_upstream(wait_for_first_sqs_message)
Here's the exception
2015-06-21 20:21:06,789 - root - INFO - Executing <Task(PostgresOperator): wait_for_new_data_in_db> for 2015-06-12 00:00:00
2015-06-21 20:21:06,801 - root - INFO - Executing: select count(*) from receiver_domain_aggregate;
2015-06-21 20:21:06,805 - root - ERROR - (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 746, in run
task_copy.execute(context=self.get_template_context())
File "/usr/local/lib/python2.7/site-packages/airflow/operators/postgres_operator.py", line 36, in execute
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
File "/usr/local/lib/python2.7/site-packages/airflow/hooks/postgres_hook.py", line 25, in __init__
if db.count() == 0:
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2734, in count
return self.from_self(col).scalar()
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2503, in scalar
ret = self.one()
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2472, in one
ret = list(self)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2515, in __iter__
return self._execute_and_instances(context)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2530, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 914, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
compiled_sql, distilled_params
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1146, in _execute_context
context)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
exc_info
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 199, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1139, in _execute_context
context)
File "/usr/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 450, in do_execute
cursor.execute(statement, parameters)
ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'psycopg2.extensions.connection' [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.password AS connection_password, connection.port AS connection_port, connection.extra AS connection_extra \nFROM connection \nWHERE connection.conn_id = %(conn_id_1)s) AS anon_1'] [parameters: {'conn_id_1': <connection object at 0x10dabe640; dsn: 'host='localhost' dbname='cousteau_dev' user='siddharth' password=xxxxxxxx', closed: 0>}]
Since I can connect using psycopg2, I'm going to revert to using a PythonOperator. My flow is entirely made up of PythonOperators. I found it easier to use.
What are the supported Python versions? Are there plans to support Python 3?
Let's say that I have 2 paths with a branching point.
a --> b1 --> c --> e --> f --> g
a --> b2 --> d
So, starting at "a", based on some condition, I either follow b1 --> c --> e --> f --> g
or b2-->d
. In my specific case, if there is data generated from stage "a", I expect to go through full processing of the data c --> e --> f --> g
. If I don't have data, then I expect to execute "d", which simply emails that there is no new data to process today!
If the entire flow were executed in a single subprocess within a single python interpreter, I could set a global boolean (tsk tsk, I know) that all operators check in order to decide whether to "do something" or not. Essentially, I would be placing the conditional logic in all operators downstream of "b". I am using the LocalExecutor right now, but don't expect that this assumption will hold.
So, I would need to store the global in a database and have all downstream operators check that boolean. Basically, I would rather just have a "branch operator" instead, so that I don't need to do this! In my flow, "b' is the branch operator, with "b1" and "b2" as branches.
In the absence of a conditional operator, I am considering the following:
If there is no data, "c" will raise an error so that it is marked red and the downstream operators would not be executed. On the other branch, "d" would pass, sending an email that there is no new data.
If there is new data, then the opposite could happen.
My approach above will achieve what I need by showing one branch as having one "red" or "failed" task (i.e. at "c" or "d") and everything downstream as not being executed. That seems to meet my need
We'd like to use comments within our dags to generate job specific documentation.
This will enable us to version control, maintain and easily produce maintainable and understandable DAGs without the overheads of writing separate documents.
This seems fairly straight forward using docstring conventions. However, I'd like to provide useful comments that explain the logic within each task within a DAG. Is there a way to use sphinx (or something similar) to generate documentation? It would be brilliant if we could produce html/pdf/any format documentation that detailed each task and its dependencies/inputs etc..
I'm noticing something wrong in the gantt chart.
The order of the first 2 stages in my gantt chart does not reflect my stage dependencies.
So I run "submit_spark_job" followed by "wait_for_first_sqs_message". Why is it then that the gantt chart shows them out of order?
My code is below:
from airflow import DAG
from airflow.operators import PythonOperator, PostgresOperator, BashOperator
from datetime import date, datetime, time, timedelta
from pprint import pprint
import base64
import boto.sqs
import boto.sns
import json
import logging
import psycopg2
import sys
import time
# CONSTANTS
SQS_QUEUE_NAME='*******'
REGION = 'us-west-2'
DB_CHECK_QUERY='SELECT count(*) from message;'
SNS_TOPIC_ARN_FOR_JOB_STATUS='*******'
EP_DB_CONNECT_STRING = "*******"
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 22),
'schedule_interval': timedelta(days=1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 3,
'retry_interval': timedelta(seconds=5)
}
dag = DAG('ep_demo', default_args=default_args)
def _wait_for_first_sqs_message_internal(ds, **kwargs):
conn = boto.sqs.connect_to_region(REGION)
q = conn.get_queue(SQS_QUEUE_NAME)
rs = q.get_messages()
while rs is None or len(rs) <= 0:
logging.info("----- WAITING FOR A MESSAGE on SQS Queue : {}".format(SQS_QUEUE_NAME))
time.sleep(10)
rs = q.get_messages()
logging.info("----- FOUND A MESSAGE on SQS Queue : {}".format(SQS_QUEUE_NAME))
return True
def _wait_for_empty_queue_internal(ds, **kwargs):
conn = boto.sqs.connect_to_region(REGION)
q = conn.get_queue(SQS_QUEUE_NAME)
q_size = q.count()
while q_size > 0:
logging.info("----- WAITING FOR Queue {} to Drain".format(SQS_QUEUE_NAME))
q = conn.get_queue(SQS_QUEUE_NAME)
q_size = q.count()
time.sleep(10)
logging.info("----- Queue {} Empty".format(SQS_QUEUE_NAME))
return True
# TODO : pointing to local db, parameterize this
def _wait_for_new_data_in_db_internal(ds, **kwargs):
db_conn = psycopg2.connect(EP_DB_CONNECT_STRING)
logging.info("----- Successfully Connected to database {}".format(EP_DB_CONNECT_STRING))
cursor = db_conn.cursor()
# Establish a base line
cursor.execute(DB_CHECK_QUERY)
result = cursor.fetchone()
initial_row_count = int(result[0])
found_new_data = None
# Detect a change in the record count
while not found_new_data:
logging.info("----- WAITING FOR A NEW DATA in DB : base count = {}".format(initial_row_count))
time.sleep(10)
cursor.execute(DB_CHECK_QUERY)
result = cursor.fetchone()
new_row_count = int(result[0])
if(new_row_count > initial_row_count):
found_new_data = True
logging.info("----- FOUND A NEW DATA in DB : (base count ==> new count) = ({} ==> {})".format(new_row_count, initial_row_count))
return True
def _send_sns_notification_internal(ds, **kwargs):
conn = boto.sns.connect_to_region(REGION)
conn.publish(message='Airflow EP Data pipeline flow completed successfully', subject='Airflow Flow Complete', target_arn=SNS_TOPIC_ARN_FOR_JOB_STATUS,
)
logging.info("----- Sent SNS about FLOW Completion")
return True
# Operator 0 : Start a spark job
#submit_spark_job = BashOperator(
# task_id='submit_spark_job',
# bash_command="ssh -i ~/.ssh/id_rsa_spark root@*******'. /root/.bash_profile; cd $SPARK_HOME/bin; ~/spark/bin/spark-submit --py-files risk.py,volume_dict.py,domain_suffix.py,sender_models.py,avro_tools.py aggregate.py --avro --input uploads/1/db80b4a2-fda4-11e4-9f6c-062f18616d35/ --s3 --domains --bucket agari-collector-ingest-avro'",
# dag=dag)
submit_spark_job = BashOperator(
task_id='submit_spark_job',
bash_command="ssh -i ~/.ssh/id_rsa_spark root@*******'. /root/.bash_profile; ~/run_ingest.sh'",
dag=dag)
# Operator 1 : Check a SQS queue for messages - currently a dummy!
wait_for_first_sqs_message = PythonOperator(
task_id='wait_for_first_sqs_message',
provide_context=True,
python_callable=_wait_for_first_sqs_message_internal,
dag=dag)
wait_for_first_sqs_message.set_upstream(submit_spark_job)
# Operator 2.a : Check for new data in DB
wait_for_new_data_in_db = PythonOperator(
task_id='wait_for_new_data_in_db',
provide_context=True,
python_callable=_wait_for_new_data_in_db_internal,
dag=dag)
wait_for_new_data_in_db.set_upstream(wait_for_first_sqs_message)
# Operator 3 : Check for SQS Queue to be drained
wait_for_empty_queue = PythonOperator(
task_id='wait_for_empty_queue',
provide_context=True,
python_callable=_wait_for_empty_queue_internal,
dag=dag)
wait_for_empty_queue.set_upstream(wait_for_new_data_in_db)
# Operator 4b : Send SNS when flow completes successfully
send_sns_notification = PythonOperator(
task_id='send_sns_notification',
provide_context=True,
python_callable=_send_sns_notification_internal,
dag=dag)
send_sns_notification.set_upstream(wait_for_empty_queue)
Description: Ubuntu 15.04
Linux my-pc 3.19.0-21-generic #21-Ubuntu SMP Sun Jun 14 18:31:11 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
I cloned the repo and tried the quick start steps : http://pythonhosted.org/airflow/start.html
got error at the following step:
$ airflow initdb
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 3, in
from airflow.bin.cli import get_parser
File "/usr/local/lib/python2.7/dist-packages/airflow/init.py", line 10, in
from airflow.configuration import conf
File "/usr/local/lib/python2.7/dist-packages/airflow/configuration.py", line 276, in
conf.read(AIRFLOW_CONFIG)
File "/usr/lib/python2.7/ConfigParser.py", line 305, in read
self._read(fp, filename)
File "/usr/lib/python2.7/ConfigParser.py", line 546, in _read
raise e
ConfigParser.ParsingError: File contains parsing errors: /home/viki/airflow/airflow.cfg
[line 32]: '/home/viki/airflow/plugins\n'
The error seems to be because the configuration file is like this
/home/viki/airflow/plugins
i changed it to
plugins_folder = /home/viki/airflow/plugins
and it worked.
Would you mind clarifying the following condition in TaskInstance.is_queueable:
if self.execution_date > datetime.now() - self.task.schedule_interval:
I would have expected:
if self.execution_date > datetime.now():
Consider the use case of having a daily task. Let's suppose the task ran early this morning for 2015-07-01. However, the task failed for some reason. After fixing the issue, I go to backfill using a command such as:
airflow backfill -s 2015-07-01 -e 2015-07-01 dag_id
As the code is currently written, the backfill will not execute, due to the conditional that I mention above. I could hack it to work by passing in yesterday's date to backfill and then using "{{ tomorrow_ds }}" in my tasks, but this seems like it shouldn't be necessary.
If you're okay with the above change, I'm happy to submit a PR. If I should be taking a different approach, please let me know. Thanks!
Let's say I have a DAG scheduled with a one-hour schedule interval, and the first step in that DAG is a sensor that pokes once a minute.
What happens if the sensor never triggers during the hour after the first DAG "submission"? What state are tasks in the first DAG left in? What if the sensor's triggering event does occur at some point after the second DAG is scheduled? Does that "release" the remainder of both DAGs?
Or maybe the example I propose just doesn't make sense as a way to set things up. If so, could you explain why?
I'm sure this also somehow interacts with depends_on_past
, but I feel like I'd be reaching too far to try and guess how. (This might be a candidate for documentation expansion.)
I noticed some inconsistencies in your Tutorial page where you walk us through the DAG. For instance, in the code block, you have:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_interval': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'schedule_interval': timedelta(1),
# 'end_date': datetime(2016, 1, 1),
}
But when you break it down, it becomes:
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 1, 1),
'email': ['[email protected]',],
'email_on_failure': True,
'email_on_retry': True,
}
And again,
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag)
which becomes:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
email_on_failure=False,
bash_command='sleep 5',
dag=dag)
I am less concerned with the fact that you changed the code to illustrate the arguments and parameters and more concerned that you say: "Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below." leading us to believe that it will be a true reproduction, which it is not. Please fix the code in the tutorial so that it is clearer. Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.