This repository contains a sample setup for hosting Apache Airflow on AWS ECS using Fargate.
This setup uses AWS Cloud Development Kit to automate resource creation.
- What is AWS Fargate
- What is Airflow
- How to use this?
- Sample DAG Explanation
- Configuration Options
- Understanding Code Structure
- Some Useful Resources
AWS Fargate is a serverless compute engine for containers that works with both Amazon Elastic Container Service (ECS) and Amazon Elastic Kubernetes Service (EKS). Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.
More info about Airflow can be found here
This setup is based on AWS CDK. So, install CDK first.
- Node.js => 12.x or later
- AWS CLI => Installation Guide
- Docker
$ npm install -g aws-cdk
Once Node.js and CDK are installed, pull this repository and run following commands:
$ npm install // Installs all necessary dependencies
$ npm run build // Build this package and generates Cloudformation Stack with resources
$ cdk deploy // Deploys the CloudFormation template to AWS account configured for your AWS CLI
This will output LoadBalancerDNSName on the terminal, which can be used to access Airflow Webserver UI.
Along with LoadBalancerDNSName, you will also get password for Admin account, to login into Airflow UI. You can change this password from Profile section after login.
For details about Admin config, check airflow/config/webserver_entry.sh
If you want to delete this stack, run following command:
$ cdk destroy
This stack creates a worflow/DAG, which has 5 tasks
start_process >> [odd_task, even_task] >> numbers_task >> on_worker_task
This DAG showcases how to create parallel tasks and how to use ECSOperator, which spins-up OnDemand Fargate instances for running a task.
Note: Each sub-folder under tasks
will result in a new Fargate TaskDefinition. These task definition will be used as part of ECSOperator
start_process: It's a dummy task, which will run on default worker of Airflow
odd_task: This task will execute odd_numbers.py
file, which is located under tasks/multi_task
. This will be executed on an OnDemand Fargate task
even_task: This task will execute even_numbers.py
file, which is located under tasks/multi_task
. This will be executed on an OnDemand Fargate task
numbers_task: This task will execute numbers_numbers.py
file, which is located under tasks/number_task
. This will be executed on an OnDemand Fargate instance
on_worker_task: This task will be executed on the default worker. It showcases how to use PythonOperator to run a task on Airflow worker
Once deployed this setup will create following AWS resources with some necessary dependencies:
- 1 VPC
- 1 Postgres DB on AWS RDS
- 1 ECS Cluster
- 3 Fargate Task Definitions
- 1 Fargate ECS Service with 3 task instances with one container each for Airflow Webserver, Scheduler and Worker
- 1 EC2 NetworkLoadBalancer
- 1 SecurityGroup: this will be used to restrict access to all of the above resources to VPC. Only webserver can be accessed from outside, using load balancer DNS name
You can find default config in config.ts
file.
export const defaultDBConfig: DBConfig = {
dbName: "farflow", // DB cluster and instance Name
port: 5432, // Port on which db instance runs
masterUsername: "airflow", // Username for master-user. Password will be autogenerated and stored in ParameterStore
instanceType: InstanceType.of(InstanceClass.T2, InstanceSize.SMALL), // Using T2.small for this setup. Upgrade as per your requirements
allocatedStorageInGB: 25, // 25GB of storeage will be allocated
backupRetentionInDays: 30 // Backup will be deleted after 30 days
};
export const defaultWebserverConfig: ContainerConfig = {
name: "WebserverContainer",
containerPort: 8080,
entryPoint: "/webserver_entry.sh"
}
export const defaultSchedulerConfig: ContainerConfig = {
name: "SchedulerContainer",
containerPort: 8081,
entryPoint: "/scheduler_entry.sh"
}
export const defaultWorkerConfig: ContainerConfig = {
name: "WorkerContainer",
containerPort: 8082,
entryPoint: "/worker_entry.sh"
}
export const airflowTaskConfig: AirflowTaskConfig = {
cpu: 2048,
memoryLimitMiB: 4096,
webserverConfig: defaultWebserverConfig,
schedulerConfig: defaultSchedulerConfig,
workerConfig: defaultWorkerConfig,
logRetention: RetentionDays.ONE_MONTH,
// Uncomment this to have dedicated worker pool that can be auto-scaled as per workerAutoScalingConfig
// createWorkerPool: true
};
Adjust configuration in this file, as per your requirements.
If you need a dedicated Worker pool for resource intense operations that need to available all the time, set createWorkerPool: true
under airflowTaskConfig
.
This will create a separate ECS Service which holds Task/Container which holds Airflow Worker. This is configured with auto-scaling to add more task instances, depending on load.
Auto-scaling config can be set as follows:
export const workerAutoScalingConfig: AutoScalingConfig = {
minTaskCount: 1,
maxTaskCount: 5,
cpuUsagePercent: 70
};
This setup uses default account and region that were used in AWS CLI configuration. In order to install it in different/multiple region or account, follow this guide
Let's understand the code structure and what each file does. Hope this helps to change things as required
📦FarFlow
┣ 📂airflow => Top-level directory that holds Airflow related config
┃ ┣ 📂config
┃ ┃ ┣ 📜scheduler_entry.sh => Entrypoint for Scheduler Container.
┃ ┃ ┣ 📜webserver_entry.sh => Entrypoint for Webserver Container. This also initializes backedn database
┃ ┃ ┗ 📜worker_entry.sh => Entrypoint for Worker Container.
┃ ┣ 📂dags => Holds all the DAGs for this Airflow instance. Add more DAGs here.
┃ ┃ ┗ 📜dag.py => Sample DAG
┃ ┗ 📜Dockerfile => Dockerfile for Airflow Image, with some dependencies
┣ 📂app => Base folder for CDK application
┃ ┣ 📂constructs => Holds helper files for CDK setup
┃ ┃ ┣ 📜airflow-construct.ts => Creates Fargate Service holding Airflow
┃ ┃ ┣ 📜dag-tasks.ts => Creates fargate tasks containing modules invoked from DAG using ECSOperator
┃ ┃ ┣ 📜rds.ts => Creates RDS Postgres instance
┃ ┃ ┣ 📜service-construct.ts => Top level Fargate service helper
┃ ┃ ┗ 📜task-construct.ts => Helper for Dag-tasks Construct
┃ ┣ 📜config.ts => Configuration for entire CDK application
┃ ┣ 📜farflow.ts => Starting point for this CDK application
┃ ┗ 📜policies.ts => Configure policies that will be attached to Airflow instances
┣ 📂tasks => Sample tasks that will be invoked from DAG using ECSOperator
┃ ┣ 📂multi_task => example task 1
┃ ┃ ┣ 📜Dockerfile => config for container holding this task
┃ ┃ ┣ 📜even_numbers.py => module-1 in this container
┃ ┃ ┗ 📜odd_numbers.py => module-2 in this container
┃ ┗ 📂number_task => example task 2
┃ ┃ ┣ 📜Dockerfile => config for container holding this task
┃ ┃ ┗ 📜numbers.py => only module for this container
┣ 📜README.md => YOU ARE READIN IT
┣ 📜cdk.json => CDK config
┣ 📜package-lock.json => npm package info (auto-generated)
┣ 📜package.json => npm dependencies
┗ 📜tsconfig.json => Typescript config
Code-tree generated using this plugin
- CDK
- Airflow Docker
- CDK Examples
- ECS Operator: This is the basis for on-demand Fargate tasks
- Airflow Config and Environment Variables: Pass Environment Variables to Docker
- Default Airflow Config