Some useful resources about Airflow:

ETL best practices with Airflow

Series of articles about Airflow in production:

  • Part 1 - about usecases and alternatives
  • Part 2 - about alternatives (Luigi and Paitball)
  • Part 3 - key concepts
  • Part 4 - deployment, issues

More notes about production

About start_time: Why isn’t my task getting scheduled?

One cannot specify datetime.now() as start_date. Instead one has to provide datetime object without timezone. Probably UTC time is required. You can do something like this:

1
2
3
4
5
from datetime import datetime, timedelta
import pytz

start_date_local = datetime(2018,1,1, 10,11, tzinfo=pytz.timezone('Europe/Minsk'))   # your time, date and time zone go here 
start_date_for_airflow = start_date_local.astimezone(pytz.utc).replace(tzinfo=None)  # we convert to UTC and remove timezone

Running with LocalExecutor and Postgres from docker

We will run postgres in docker:

docker run -it -p 5432:5432 --name postgres -e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=postgres -d postgres

Now we can specify in airflow.cfg:

sql_alchemy_conn = postgresql://postgres:postgres@localhost:5432/postgres

and

executor = LocalExecutor

Then run

airflow initdb
airflow scheduler

How to specify AWS region for EmrCreateJobFlowOperator

There is no parameter in EmrCreateJobFlowOperator to specify
AWS region where the cluster has to be deployed.

Internally EmrCreateJobFlowOperator uses EmrHook where get_client_type('emr') is called. get_client_type has a default paramater region_name=None. That means there is no way to set this parameter in code.

One has to configure it using airflow configurations. Here we create connection aws_my with AWS region eu-west-1:

airflow connections -a --conn_id 'aws_my' --conn_uri 'aws:' --conn_extra '{"region_name": "eu-west-1"}'

Now we can use new connection for EmrCreateJobFlowOperator to create Spark cluster on EMR.

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides={}, # your cluster configurations go here
    aws_conn_id='aws_my', # <---- this is important
    emr_conn_id='emr_default',
    dag=your_dag
) 

Done