Skip to content

Commit

Permalink
feat(cdk-ops): add vacuum workflow
Browse files Browse the repository at this point in the history
- `DataWarehouse` introduces a Step Functions State Machine (workflow)
  to run VACUUM over the tables. The workflow picks the table one by
  one and applies a new Lambda function `lambda/vacuum-table` to it.
  `lambda/vacuum-table` runs `VACUUM` over a given table.

issue codemonger-io#30
  • Loading branch information
kikuomax committed Oct 11, 2022
1 parent ae03435 commit e7a8ea8
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 0 deletions.
64 changes: 64 additions & 0 deletions cdk-ops/lambda/vacuum-table/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-

"""Runs VACUUM over a given table.
You have to specify the following environment variables.
* ``WORKGROUP_NAME``: name of the Redshift Serverless workgroup.
* ``ADMIN_SECRET_ARN``: ARN of the secret containing the admin password.
"""

import logging
import os
import boto3
from libdatawarehouse import ACCESS_LOGS_DATABASE_NAME, data_api


WORKGROUP_NAME = os.environ['WORKGROUP_NAME']
ADMIN_SECRET_ARN = os.environ['ADMIN_SECRET_ARN']

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

redshift_data = boto3.client('redshift-data')


def lambda_handler(event, _):
"""Runs VACUUM over a given table.
``event`` must be a ``dict`` similar to the following,
.. code-block:: python
{
'tableName': '<table-name>',
'mode': 'SORT ONLY'
}
"""
LOGGER.debug('running VACUUM: %s', str(event))
table_name = event['tableName']
# TODO: verify table_name
mode = event['mode']
# TODO: verify mode
queue_res = redshift_data.execute_statement(
WorkgroupName=WORKGROUP_NAME,
SecretArn=ADMIN_SECRET_ARN,
Database=ACCESS_LOGS_DATABASE_NAME,
Sql=f'VACUUM {mode} {table_name}',
)
status, res = data_api.wait_for_results(redshift_data, queue_res['Id'])
if status == 'FAILED':
LOGGER.error('VACUUM over %s failed: %s', table_name, str(res))
elif status is None:
LOGGER.error('VACUUM over %s timed out', table_name)
status = 'TIMEOUT'
elif status == 'FINISHED':
LOGGER.debug(
'VACUUM over %s finished in %.3f ms',
table_name,
res.get('Duration', 0) * 0.001 * 0.001, # ns → ms
)
else:
LOGGER.error('VACUUM over %s failed: %s', table_name, status)
return {
'status': status,
}
61 changes: 61 additions & 0 deletions cdk-ops/lib/data-warehouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {
aws_lambda as lambda,
aws_redshiftserverless as redshift,
aws_secretsmanager as secrets,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
Expand Down Expand Up @@ -46,6 +48,8 @@ export class DataWarehouse extends Construct {
readonly workgroupName: string;
/** Redshift Serverless workgroup. */
readonly workgroup: redshift.CfnWorkgroup;
/** Step Functions to run VACUUM over tables. */
readonly vacuumWorkflow: sfn.IStateMachine;

constructor(scope: Construct, id: string, props: Props) {
super(scope, id);
Expand Down Expand Up @@ -168,6 +172,63 @@ export class DataWarehouse extends Construct {
this.adminSecret.grantRead(populateDwDatabaseLambda);
// TODO: too permissive?
populateDwDatabaseLambda.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonRedshiftDataFullAccess'));

// Step Functions that perform VACUUM over tables.
// - Lambda function that runs VACUUM over a given table
const vacuumTableLambda = new PythonFunction(this, 'VacuumTableLambda', {
description: `Runs VACUUM over a table (${deploymentStage})`,
runtime: lambda.Runtime.PYTHON_3_8,
architecture: lambda.Architecture.ARM_64,
entry: path.join('lambda', 'vacuum-table'),
index: 'index.py',
handler: 'lambda_handler',
layers: [latestBoto3.layer, libdatawarehouse.layer],
environment: {
WORKGROUP_NAME: this.workgroupName,
ADMIN_SECRET_ARN: this.adminSecret.secretArn,
},
timeout: Duration.minutes(15),
});
this.adminSecret.grantRead(vacuumTableLambda);
this.grantQuery(vacuumTableLambda);
// - state machine
// - lists table names
const listTableNamesState = new sfn.Pass(this, 'ListTables', {
comment: 'Lists table names',
result: sfn.Result.fromArray([
'access_log',
'referer',
'page',
'edge_location',
'user_agent',
'result_type',
]),
resultPath: '$.tables',
// produces something like
// {
// mode: 'SORT ONLY',
// tableNames: ['access_log', ...]
// }
});
this.vacuumWorkflow = new sfn.StateMachine(this, 'VacuumWorkflow', {
definition:
listTableNamesState.next(
new sfn.Map(this, 'MapTables', {
comment: 'Iterates over tables',
maxConcurrency: 1, // sequential
itemsPath: '$.tables',
parameters: {
'tableName.$': '$$.Map.Item.Value',
'mode.$': '$.mode',
},
}).iterator(
new sfn_tasks.LambdaInvoke(this, 'VacuumTable', {
lambdaFunction: vacuumTableLambda,
}),
),
),
timeout: Duration.hours(1),
});
}

/** Returns subnet IDs for the cluster of Redshift Serverless. */
Expand Down

0 comments on commit e7a8ea8

Please sign in to comment.