diff --git a/cdk-ops/lambda/vacuum-table/index.py b/cdk-ops/lambda/vacuum-table/index.py new file mode 100644 index 0000000..3fe066d --- /dev/null +++ b/cdk-ops/lambda/vacuum-table/index.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- + +"""Runs VACUUM over a given table. + +You have to specify the following environment variables. +* ``WORKGROUP_NAME``: name of the Redshift Serverless workgroup. +* ``ADMIN_SECRET_ARN``: ARN of the secret containing the admin password. +""" + +import logging +import os +import boto3 +from libdatawarehouse import ACCESS_LOGS_DATABASE_NAME, data_api + + +WORKGROUP_NAME = os.environ['WORKGROUP_NAME'] +ADMIN_SECRET_ARN = os.environ['ADMIN_SECRET_ARN'] + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +redshift_data = boto3.client('redshift-data') + + +def lambda_handler(event, _): + """Runs VACUUM over a given table. + + ``event`` must be a ``dict`` similar to the following, + + .. code-block:: python + + { + 'tableName': '', + 'mode': 'SORT ONLY' + } + """ + LOGGER.debug('running VACUUM: %s', str(event)) + table_name = event['tableName'] + # TODO: verify table_name + mode = event['mode'] + # TODO: verify mode + queue_res = redshift_data.execute_statement( + WorkgroupName=WORKGROUP_NAME, + SecretArn=ADMIN_SECRET_ARN, + Database=ACCESS_LOGS_DATABASE_NAME, + Sql=f'VACUUM {mode} {table_name}', + ) + status, res = data_api.wait_for_results(redshift_data, queue_res['Id']) + if status == 'FAILED': + LOGGER.error('VACUUM over %s failed: %s', table_name, str(res)) + elif status is None: + LOGGER.error('VACUUM over %s timed out', table_name) + status = 'TIMEOUT' + elif status == 'FINISHED': + LOGGER.debug( + 'VACUUM over %s finished in %.3f ms', + table_name, + res.get('Duration', 0) * 0.001 * 0.001, # ns → ms + ) + else: + LOGGER.error('VACUUM over %s failed: %s', table_name, status) + return { + 'status': status, + } diff --git a/cdk-ops/lib/data-warehouse.ts b/cdk-ops/lib/data-warehouse.ts index 5492d61..77ec928 100644 --- a/cdk-ops/lib/data-warehouse.ts +++ b/cdk-ops/lib/data-warehouse.ts @@ -9,6 +9,8 @@ import { aws_lambda as lambda, aws_redshiftserverless as redshift, aws_secretsmanager as secrets, + aws_stepfunctions as sfn, + aws_stepfunctions_tasks as sfn_tasks, } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; @@ -46,6 +48,8 @@ export class DataWarehouse extends Construct { readonly workgroupName: string; /** Redshift Serverless workgroup. */ readonly workgroup: redshift.CfnWorkgroup; + /** Step Functions to run VACUUM over tables. */ + readonly vacuumWorkflow: sfn.IStateMachine; constructor(scope: Construct, id: string, props: Props) { super(scope, id); @@ -168,6 +172,63 @@ export class DataWarehouse extends Construct { this.adminSecret.grantRead(populateDwDatabaseLambda); // TODO: too permissive? populateDwDatabaseLambda.role?.addManagedPolicy(iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonRedshiftDataFullAccess')); + + // Step Functions that perform VACUUM over tables. + // - Lambda function that runs VACUUM over a given table + const vacuumTableLambda = new PythonFunction(this, 'VacuumTableLambda', { + description: `Runs VACUUM over a table (${deploymentStage})`, + runtime: lambda.Runtime.PYTHON_3_8, + architecture: lambda.Architecture.ARM_64, + entry: path.join('lambda', 'vacuum-table'), + index: 'index.py', + handler: 'lambda_handler', + layers: [latestBoto3.layer, libdatawarehouse.layer], + environment: { + WORKGROUP_NAME: this.workgroupName, + ADMIN_SECRET_ARN: this.adminSecret.secretArn, + }, + timeout: Duration.minutes(15), + }); + this.adminSecret.grantRead(vacuumTableLambda); + this.grantQuery(vacuumTableLambda); + // - state machine + // - lists table names + const listTableNamesState = new sfn.Pass(this, 'ListTables', { + comment: 'Lists table names', + result: sfn.Result.fromArray([ + 'access_log', + 'referer', + 'page', + 'edge_location', + 'user_agent', + 'result_type', + ]), + resultPath: '$.tables', + // produces something like + // { + // mode: 'SORT ONLY', + // tableNames: ['access_log', ...] + // } + }); + this.vacuumWorkflow = new sfn.StateMachine(this, 'VacuumWorkflow', { + definition: + listTableNamesState.next( + new sfn.Map(this, 'MapTables', { + comment: 'Iterates over tables', + maxConcurrency: 1, // sequential + itemsPath: '$.tables', + parameters: { + 'tableName.$': '$$.Map.Item.Value', + 'mode.$': '$.mode', + }, + }).iterator( + new sfn_tasks.LambdaInvoke(this, 'VacuumTable', { + lambdaFunction: vacuumTableLambda, + }), + ), + ), + timeout: Duration.hours(1), + }); } /** Returns subnet IDs for the cluster of Redshift Serverless. */