Skip to content

Commit

Permalink
AWS Airflow (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
sekka1 authored Mar 15, 2022
1 parent fc682dc commit c1134a9
Show file tree
Hide file tree
Showing 4 changed files with 305 additions and 0 deletions.
98 changes: 98 additions & 0 deletions terraform-modules/aws/airflow/default_iam_policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "airflow:PublishMetrics",
"Resource": "arn:aws:airflow:${aws_region}:${aws_account_id}:environment/${airflow_name}"
},
{
"Effect": "Deny",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::${s3_bucket_name}",
"arn:aws:s3:::${s3_bucket_name}/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
"Resource": [
"arn:aws:s3:::${s3_bucket_name}",
"arn:aws:s3:::${s3_bucket_name}/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults"
],
"Resource": [
"arn:aws:logs:${aws_region}:${aws_account_id}:log-group:airflow-${airflow_name}-*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetAccountPublicAccessBlock"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:${aws_region}:*:airflow-celery-*"
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:DescribeKey",
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"Resource": "arn:aws:kms:${aws_region}:${aws_account_id}:key/*",
"Condition": {
"StringLike": {
"kms:ViaService": [
"sqs.${aws_region}.amazonaws.com",
"s3.${aws_region}.amazonaws.com"
]
}
}
}
]
}
96 changes: 96 additions & 0 deletions terraform-modules/aws/airflow/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
resource "aws_mwaa_environment" "this" {
name = var.airflow_name
airflow_version = var.airflow_version
environment_class = var.environment_class
max_workers = var.max_workers
min_workers = var.min_workers
source_bucket_arn = var.source_bucket_arn
dag_s3_path = var.dag_s3_path
execution_role_arn = module.iam_assumable_role_admin.iam_role_arn

logging_configuration {
dag_processing_logs {
enabled = true
log_level = var.dag_processing_log_level
}

scheduler_logs {
enabled = true
log_level = var.scheduler_log_level
}

task_logs {
enabled = true
log_level = var.task_log_level
}

webserver_logs {
enabled = true
log_level = var.webserver_log_level
}

worker_logs {
enabled = true
log_level = var.worker_log_level
}
}

network_configuration {
security_group_ids = [aws_security_group.this.id]
subnet_ids = var.subnet_ids
}

tags = var.tags
}

data "aws_caller_identity" "current" {}

module "iam_assumable_role_admin" {
source = "terraform-aws-modules/iam/aws//modules/iam-assumable-role"
version = "4.14.0"

create_role = true
role_name = "airflow-${var.airflow_name}"
role_description = "Airflow role"
trusted_role_services = ["airflow.amazonaws.com","airflow-env.amazonaws.com"]
custom_role_policy_arns = [aws_iam_policy.policy.arn]
role_requires_mfa = false
tags = var.tags
}

resource "aws_iam_policy" "policy" {
name_prefix = "cluster-autoscaler-${var.airflow_name}"
description = "Airflow policy"
policy = templatefile("default_iam_policy.json", {
aws_region = var.aws_region
aws_account_id = data.aws_caller_identity.current.account_id
airflow_name = var.airflow_name
s3_bucket_name = var.source_bucket_name
})

tags = var.tags
}

resource "aws_security_group" "this" {
name = var.airflow_name
description = "Airflow security group"
vpc_id = var.vpc_id

ingress {
description = "TLS from VPC"
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}

tags = var.tags
}
7 changes: 7 additions & 0 deletions terraform-modules/aws/airflow/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
output "arn" {
value = aws_mwaa_environment.this.arn
}

output "webserver_url" {
value = aws_mwaa_environment.this.webserver_url
}
104 changes: 104 additions & 0 deletions terraform-modules/aws/airflow/variables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
variable "airflow_name" {
type = string
default = "airflow"
description = "Airflow name"
}

variable "aws_region" {
type = string
default = "us-east-1"
description = "The AWS region"
}

variable "vpc_id" {
type = string
default = ""
description = "The vpc ID"
}

variable "subnet_ids" {
type = list(string)
default = []
description = "(Required) The private subnet IDs in which the environment should be created. MWAA requires two subnets."
}

variable "airflow_version" {
type = string
default = null
description = "(Optional) Airflow version of your environment, will be set by default to the latest version that MWAA supports."
}


variable "environment_class" {
type = string
default = "mw1.small"
description = "(Optional) Environment class for the cluster. Possible options are mw1.small, mw1.medium, mw1.large. Will be set by default to mw1.small. Please check the AWS Pricing for more information about the environment classes."
}

variable "max_workers" {
type = number
default = 10
description = "(Optional) The maximum number of workers that can be automatically scaled up. Value need to be between 1 and 25. Will be 10 by default."
}

variable "min_workers" {
type = number
default = 1
description = "(Optional) The minimum number of workers that you want to run in your environment. Will be 1 by default."
}



variable "source_bucket_arn" {
type = string
default = "s3://foo"
description = "The Dag's S3 bucket arn: arn:aws:s3:::bucketname"
}

variable "source_bucket_name" {
type = string
default = "foo"
description = "The Dag's S3 bucket name"
}

variable "dag_s3_path" {
type = string
default = "dags/"
description = "The dag's S3 path"
}

variable "tags" {
type = any
default = {}
description = "A set of tags to place on the items"
}

variable "dag_processing_log_level" {
type = string
default = "INFO"
description = "The log level: INFO | WARNING | ERROR | CRITICAL"
}

variable "scheduler_log_level" {
type = string
default = "INFO"
description = "The log level: INFO | WARNING | ERROR | CRITICAL"
}

variable "task_log_level" {
type = string
default = "INFO"
description = "The log level: INFO | WARNING | ERROR | CRITICAL"
}

variable "webserver_log_level" {
type = string
default = "INFO"
description = "The log level: INFO | WARNING | ERROR | CRITICAL"
}

variable "worker_log_level" {
type = string
default = "INFO"
description = "The log level: INFO | WARNING | ERROR | CRITICAL"
}

0 comments on commit c1134a9

Please sign in to comment.