Skip to content

Commit

Permalink
An ETL pipeline with Amazon Redshift and AWS Glue example (#498)
Browse files Browse the repository at this point in the history
  • Loading branch information
polkx authored Aug 22, 2024
1 parent e296d7f commit 428ce73
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 0 deletions.
10 changes: 10 additions & 0 deletions examples/aws-redshift-glue-etl/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
### Scala an JVM
*.class
*.log
.bsp
.scala-build

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

kubeconfig.json
224 changes: 224 additions & 0 deletions examples/aws-redshift-glue-etl/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import besom.*
import besom.api.aws
import besom.json.*

@main def main = Pulumi.run {

val clusterIdentifier = "my-redshift-cluster"
val clusterDBName = "dev"
val clusterDBUsername = "admin"
val clusterDBPassword = "Password!123"
val glueDBName = "my-glue-db"

// Create an S3 bucket to store some raw data.
val eventsBucket = aws.s3.Bucket(
name = "events",
aws.s3.BucketArgs(forceDestroy = true)
)

// Create a VPC.
val vpc = aws.ec2.Vpc(
name = "vpc",
aws.ec2.VpcArgs(
cidrBlock = "10.0.0.0/16",
enableDnsHostnames = true
)
)

// Create a private subnet within the VPC.
val subnet = aws.ec2.Subnet(
name = "subnet",
aws.ec2.SubnetArgs(
vpcId = vpc.id,
cidrBlock = "10.0.1.0/24"
)
)

// Declare a Redshift subnet group with the subnet ID.
val subnetGroup = aws.redshift.SubnetGroup(
name = "subnet-group",
aws.redshift.SubnetGroupArgs(
subnetIds = List(subnet.id)
)
)

// Create an IAM role granting Redshift read-only access to S3.
val redshiftRole = aws.iam.Role(
name = "redshift-role",
aws.iam.RoleArgs(
assumeRolePolicy = json"""{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "redshift.amazonaws.com"
}
}
]
}""".map(_.prettyPrint),
managedPolicyArns = List(
aws.iam.enums.ManagedPolicy.AmazonS3ReadOnlyAccess.value
)
)
)

// Create a VPC endpoint so the cluster can read from S3 over the private network.
val vpcEndpoint = aws.ec2.VpcEndpoint(
name = "s3-vpc-endpoint",
aws.ec2.VpcEndpointArgs(
vpcId = vpc.id,
serviceName = p"com.amazonaws.${aws.getRegion(aws.GetRegionArgs()).name}.s3",
routeTableIds = List(vpc.mainRouteTableId)
)
)

// Create a single-node Redshift cluster in the VPC.
val cluster = aws.redshift.Cluster(
name = "cluster",
aws.redshift.ClusterArgs(
clusterIdentifier = clusterIdentifier,
databaseName = clusterDBName,
masterUsername = clusterDBUsername,
masterPassword = clusterDBPassword,
nodeType = "ra3.xlplus",
clusterSubnetGroupName = subnetGroup.name,
clusterType = "single-node",
publiclyAccessible = false,
skipFinalSnapshot = true,
vpcSecurityGroupIds = List(vpc.defaultSecurityGroupId),
iamRoles = List(redshiftRole.arn)
)
)

// Define an AWS cron expression of "every 15 minutes".
// https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchevents-expressions.html
val every15minutes = "cron(0/15 * * * ? *)"

// Create a Glue catalog database.
val glueCatalogDB = aws.glue.CatalogDatabase(
name = "glue-catalog-db",
aws.glue.CatalogDatabaseArgs(
name = glueDBName
)
)

// Define an IAM role granting AWS Glue access to S3 and other Glue-required services.
val glueRole = aws.iam.Role(
name = "glue-role",
aws.iam.RoleArgs(
assumeRolePolicy = json"""{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
}
}
]
}""".map(_.prettyPrint),
managedPolicyArns = List(
aws.iam.enums.ManagedPolicy.AmazonS3FullAccess.value,
aws.iam.enums.ManagedPolicy.AWSGlueServiceRole.value
)
)
)

// Create a Glue crawler to process the contents of the data bucket on a schedule.
// https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html
val glueCrawler = aws.glue.Crawler(
name = "glue-crawler",
aws.glue.CrawlerArgs(
databaseName = glueCatalogDB.name,
role = glueRole.arn,
schedule = every15minutes,
s3Targets = List(
aws.glue.inputs.CrawlerS3TargetArgs(
path = p"s3://${eventsBucket.bucket}"
)
)
)
)

// Create a Glue connection to the Redshift cluster.
val glueRedshiftConnection = aws.glue.Connection(
name = "glue-redshift-connection",
aws.glue.ConnectionArgs(
connectionType = "JDBC",
connectionProperties = Map(
"JDBC_CONNECTION_URL" -> p"jdbc:redshift://${cluster.endpoint}/${clusterDBName}",
"USERNAME" -> clusterDBUsername,
"PASSWORD" -> clusterDBPassword
),
physicalConnectionRequirements = aws.glue.inputs.ConnectionPhysicalConnectionRequirementsArgs(
securityGroupIdLists = cluster.vpcSecurityGroupIds,
availabilityZone = subnet.availabilityZone,
subnetId = subnet.id
)
)
)

// Create an S3 bucket for Glue scripts and temporary storage.
val glueJobBucket = aws.s3.Bucket(
name = "glue-job-bucket",
aws.s3.BucketArgs(
forceDestroy = true
)
)

// Upload a Glue job script.
val glueJobScript = aws.s3.BucketObject(
name = "glue-job.py",
aws.s3.BucketObjectArgs(
bucket = glueJobBucket.id,
source = Asset.FileAsset("./glue-job.py")
)
)

// Create a Glue job that runs our Python ETL script.
val glueJob = aws.glue.Job(
name = "glue-job",
aws.glue.JobArgs(
roleArn = glueRole.arn,
glueVersion = "3.0",
numberOfWorkers = 10,
workerType = "G.1X",
defaultArguments = Map(
// Enabling job bookmarks helps you avoid loading duplicate data.
// https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html
"--job-bookmark-option" -> "job-bookmark-enable",
"--ConnectionName" -> glueRedshiftConnection.name,
"--GlueDBName" -> glueDBName,
"--GlueDBTableName" -> eventsBucket.bucket.map(_.replace("-", "_")),
"--RedshiftDBName" -> clusterDBName,
"--RedshiftDBTableName" -> "events",
"--RedshiftRoleARN" -> redshiftRole.arn,
"--TempDir" -> p"s3://${glueJobBucket.bucket}/glue-job-temp"
),
connections = List(glueRedshiftConnection.name),
command = aws.glue.inputs.JobCommandArgs(
scriptLocation = p"s3://${glueJobBucket.bucket}/glue-job.py",
pythonVersion = "3"
)
)
)

// Create a Glue trigger to run the job every 15 minutes.
val glueJobTrigger = aws.glue.Trigger(
name = "trigger",
aws.glue.TriggerArgs(
schedule = every15minutes,
`type` = "SCHEDULED",
actions = List(
aws.glue.inputs.TriggerActionArgs(jobName = glueJob.name)
)
)
)

Stack(vpcEndpoint, glueCrawler, glueJobScript, glueJobTrigger).exports(
dataBucketName = eventsBucket.bucket
)
}
3 changes: 3 additions & 0 deletions examples/aws-redshift-glue-etl/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: aws-redshift-glue-etl
description: An ETL pipeline with Amazon Redshift and AWS Glue
runtime: scala
45 changes: 45 additions & 0 deletions examples/aws-redshift-glue-etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# ETL pipeline with Amazon Redshift and AWS Glue

This example creates an ETL pipeline using Amazon Redshift and AWS Glue. The pipeline extracts data from an S3 bucket
with a Glue crawler, transforms it with a Python script wrapped in a Glue job, and loads it into a Redshift database
deployed in a VPC.

## Prerequisites

[Follow the instructions](https://www.pulumi.com/docs/clouds/aws/get-started/begin/)
to get started with Pulumi & AWS.

## Deploying

1. Create a new stack, which is an isolated deployment target for this example:

```bash
pulumi stack init dev
```

2. Set the AWS region:

```bash
pulumi config set aws:region us-west-2
```

3. Stand up the cluster:

```bash
pulumi up
```
4. In a few moments, the Redshift cluster and Glue components will be up and running and the S3 bucket name emitted as a
Pulumi stack output:

5. Upload the included sample data file to S3 to verify the automation works as expected:

```bash
aws s3 cp events-1.txt s3://$(pulumi stack output dataBucketName)
```

6. When you're ready, destroy your stack and remove it:
```bash
pulumi destroy --yes
pulumi stack rm --yes
```
3 changes: 3 additions & 0 deletions examples/aws-redshift-glue-etl/events-1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"id": 1, "name": "An interesting event"}
{"id": 2, "name": "Another interesting event"}
{"id": 3, "name": "An event of monumental importance"}
61 changes: 61 additions & 0 deletions examples/aws-redshift-glue-etl/glue-job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sys
from awsglue.utils import getResolvedOptions
from awsglue.transforms import ApplyMapping
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext

# Collect the arguments passed in by the glue.Job run.
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"TempDir",
"ConnectionName",
"GlueDBName",
"GlueDBTableName",
"RedshiftRoleARN",
"RedshiftDBName",
"RedshiftDBTableName",
],
)

glueContext = GlueContext(SparkContext.getOrCreate())

job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Extract all unprocessed data from the Glue catalog.
source0 = glueContext.create_dynamic_frame.from_catalog(
database=args["GlueDBName"],
table_name=args["GlueDBTableName"],
additional_options={
"jobBookmarkKeys": ["id"],
"jobBookmarkKeysSortOrder": "asc",
},
transformation_ctx="source0",
)

# Transform the data (mostly just to show how to do so).
transformed0 = ApplyMapping.apply(
frame=source0,
mappings=[
("id", "int", "event_id", "int"),
("name", "string", "event_name", "string"),
],
)

# Load the data into the Redshift database.
glueContext.write_dynamic_frame.from_jdbc_conf(
frame=transformed0,
catalog_connection=args["ConnectionName"],
connection_options={
"database": args["RedshiftDBName"],
"dbtable": args["RedshiftDBTableName"],
"aws_iam_role": args["RedshiftRoleARN"],
},
redshift_tmp_dir=args["TempDir"],
)

# Call commit() to reset the job bookmark for the next run.
job.commit()
6 changes: 6 additions & 0 deletions examples/aws-redshift-glue-etl/project.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//> using scala "3.3.1"
//> using options -Werror -Wunused:all -Wvalue-discard -Wnonunit-statement
//> using dep "org.virtuslab::besom-core:0.4.0-SNAPSHOT"
//> using dep "org.virtuslab::besom-aws:6.32.0-core.0.4-SNAPSHOT"

//> using repository sonatype:snapshots

0 comments on commit 428ce73

Please sign in to comment.