
Importing various packages # airflow related from airflow import DAG from _operator import PythonOperator from _operator import BashOperator # other packages from datetime import datetime from datetime import timedelta We create a new Python file my_dag.py and save it inside the dags folder.

Here is a typical folder structure for our environment to add DAGs, configure them and run them.
PYTHON AIRFLOW INSTALL
Installation and Folder structureĪirflow is easy (yet restrictive) to install as a single package. To put these concepts into action, we’ll install Airflow and define our first DAG. Airflow has built-in operators that you can use for common tasks. The method that calls this Python function in Airflow is the operator. For example, a Python function to read from S3 and push to a database is a task. Tasks are defined as “what to run?” and operators are “how to run”. Note: Don’t confuse operators with tasks. Tasks are expected to be idempotent - no matter how many times you run a task, it needs to result in the same outcome for the same input parameters. They can be functions in Python or external scripts that you can call. Tasks are user-defined activities ran by the operators. You can create custom operators by extending the BaseOperator class and implementing the execute() method.

Each operator runs a particular task written as Python functions or shell command. Workflows are defined by creating a DAG of operators. Operators are the “workers” that run our tasks. To ensure that Airflow knows all the DAGs and tasks that need to be run, there can only be one scheduler. As a user, interactions with the scheduler will be limited to providing it with information about the different tasks, and when it has to run. The Scheduler is the brains behind setting up the workflows in Airflow. The Spark job has to wait for the three “read” tasks and populate the data into S3 and HDFS. Once that is completed, we initiate a Spark job to join the data on a key and write the output of the transformation to Redshift.ĭefining a DAG enables the scheduler to know which tasks can be run immediately, and which have to wait for other tasks to complete. We will work on this example DAG that reads data from 3 sources independently. DAGs are a high-level outline that define the dependent and exclusive tasks that can be ordered and scheduled. In Airflow, Directed Acyclic Graphs (DAGs) are used to create the workflows. The DAG that we are building using Airflow Let’s look at few concepts that you’ll need to write our first workflow. When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative. This removes the need to use restrictive JSON or XML configuration files.


This was a major reason why it eventually became an open source project. Open source: After starting as an internal project at Airbnb, Airflow had a natural need in the community.There are numerous resources for understanding what Airflow does, but it’s much easier to understand by directly working through an example. It gets complicated if you’re waiting on some input data from a third-party, and several teams are depending on your tasks to start their jobs.Īirflow is a workflow scheduler to help with scheduling complex workflows and provide an easy way to maintain them. This is really good for simple workflows, but things get messier when you start to maintain the workflow in large organizations with dependencies. To automate this pipeline and run it weekly, you could use a time-based scheduler like Cron by defining the workflows in Crontab. For example, a pipeline could consist of tasks like reading archived logs from S3, creating a Spark job to extract relevant features, indexing the features using Solr and updating the existing index to allow search. Why Airflow?ĭata pipelines are built by defining a set of “tasks” to extract, analyze, transform, load and store the data.
PYTHON AIRFLOW CODE
In this blog, I cover the main concepts behind pipeline automation with Airflow and go through the code (and a few gotchas) to create your first workflow with ease.
