Skip to content

Commit

Permalink
feat(cdk-ops): start VACUUM after load
Browse files Browse the repository at this point in the history
- `lambda/load-access-logs` starts the VACUUM workflow over the updated
  tables. "SORT ONLY" is sufficient because `lambda/load-access-logs`
  never deletes rows.

issue codemonger-io#30
  • Loading branch information
kikuomax committed Oct 11, 2022
1 parent e7a8ea8 commit 8a58bc9
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
21 changes: 19 additions & 2 deletions cdk-ops/lambda/load-access-logs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"""

import datetime
import json
import logging
import os
import boto3
Expand All @@ -22,6 +23,7 @@
SOURCE_KEY_PREFIX = os.environ['SOURCE_KEY_PREFIX']
REDSHIFT_WORKGROUP_NAME = os.environ['REDSHIFT_WORKGROUP_NAME']
COPY_ROLE_ARN = os.environ['COPY_ROLE_ARN']
VACUUM_WORKFLOW_ARN = os.environ['VACUUM_WORKFLOW_ARN']

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)
Expand All @@ -30,6 +32,7 @@

redshift = boto3.client('redshift-serverless')
redshift_data = boto3.client('redshift-data')
stepfunctions = boto3.client('stepfunctions')


def has_access_logs(date: datetime.datetime) -> bool:
Expand Down Expand Up @@ -523,6 +526,19 @@ def format_date_part(date: datetime.datetime) -> str:
return f'{date.year:04d}/{date.month:02d}/{date.day:02d}/'


def start_vacuum():
"""Starts VACUUM over the updated tables.
"""
res = stepfunctions.start_execution(
stateMachineArn=VACUUM_WORKFLOW_ARN,
input=json.dumps({
# "SORT ONLY" is sufficient because no deletes have been performed
'mode': 'SORT ONLY',
}),
)
LOGGER.debug('started VACUUM: %s', str(res))


def lambda_handler(event, _):
"""Loads CloudFront access logs onto the data warehouse.
Expand All @@ -549,11 +565,12 @@ def lambda_handler(event, _):
)
LOGGER.debug('accessing database as %s', res['dbUser'])
execute_load_script(target_date)
# we need VACUUM to sort updated tables.
# run VACUUM in a different session (e.g., Step Functions) because,
# we need VACUUM to sort the updated tables.
# runs VACUUM in a different session (e.g., Step Functions) because,
# - VACUUM needs an owner or superuser privilege
# - VACUUM is time consuming
# - only one VACUUM can run at the same time
start_vacuum()
else:
LOGGER.debug('no access logs on %s', str(target_date))
return {}
2 changes: 2 additions & 0 deletions cdk-ops/lib/access-logs-etl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,15 @@ export class AccessLogsETL extends Construct {
SOURCE_KEY_PREFIX: maskedAccessLogsKeyPrefix,
REDSHIFT_WORKGROUP_NAME: dataWarehouse.workgroupName,
COPY_ROLE_ARN: dataWarehouse.namespaceRole.roleArn,
VACUUM_WORKFLOW_ARN: dataWarehouse.vacuumWorkflow.stateMachineArn,
},
timeout: Duration.minutes(15),
memorySize: 256,
},
);
this.outputAccessLogsBucket.grantRead(loadAccessLogsLambda);
dataWarehouse.grantQuery(loadAccessLogsLambda);
dataWarehouse.vacuumWorkflow.grantStartExecution(loadAccessLogsLambda);
// - schedules running loadAccessLogsLambda
const loadSchedule = new events.Rule(this, 'LoadAccessLogsSchedule', {
description: `Periodically loads access logs (${deploymentStage})`,
Expand Down

0 comments on commit 8a58bc9

Please sign in to comment.