From 27d4910950ba5f246757fdc43780b114ab566edd Mon Sep 17 00:00:00 2001 From: Kikuo Emoto Date: Sun, 9 Oct 2022 17:06:27 +0900 Subject: [PATCH] fix(cdk-ops): check existence of access logs - The bug that `lambda/load-access-logs` crashed when there were no access logs on a given date. It now makes sure that there are access logs on a given date before running the script. - `AccessLogsETL` grants `lambda/load-access-logs` read permissions of the S3 bucket of access logs. issue codemonger-io/codemonger#30 --- cdk-ops/lambda/load-access-logs/index.py | 51 +++++++++++++++++++----- cdk-ops/lib/access-logs-etl.ts | 4 +- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/cdk-ops/lambda/load-access-logs/index.py b/cdk-ops/lambda/load-access-logs/index.py index 0792f4c..65f3375 100644 --- a/cdk-ops/lambda/load-access-logs/index.py +++ b/cdk-ops/lambda/load-access-logs/index.py @@ -26,10 +26,24 @@ LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) +s3 = boto3.client('s3') + redshift = boto3.client('redshift-serverless') redshift_data = boto3.client('redshift-data') +def has_access_logs(date: datetime.datetime) -> bool: + """Returns whether there are access logs on a given date. + """ + access_logs_prefix = get_access_logs_prefix(date) + res = s3.list_objects_v2( + Bucket=SOURCE_BUCKET_NAME, + Prefix=access_logs_prefix, + MaxKeys=1, + ) + return len(res.get('Contents', [])) > 0 + + def execute_load_script(date: datetime.datetime): """Executes the script to load CloudFront access logs. @@ -143,10 +157,10 @@ def get_create_raw_access_log_table_statement() -> str: def get_load_access_logs_statement(date: datetime.datetime) -> str: """Returns an SQL statement that loads access logs from the S3 bucket. """ - date_part = f'{date.year:04d}/{date.month:02d}/{date.day:02d}/' + access_logs_prefix = get_access_logs_prefix(date) return ''.join([ 'COPY #raw_access_log', - f" FROM 's3://{SOURCE_BUCKET_NAME}/{SOURCE_KEY_PREFIX}{date_part}'", + f" FROM 's3://{SOURCE_BUCKET_NAME}/{access_logs_prefix}'", f" IAM_ROLE '{COPY_ROLE_ARN}'", ' GZIP', " DELIMITER '\t'", @@ -493,6 +507,22 @@ def parse_time(time_str: str) -> datetime.datetime: return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S%z') +def get_access_logs_prefix(date: datetime.datetime) -> str: + """Returns the S3 object key prefix of access log files on a given date. + + A returned string contains a trailing slash (/). + """ + return f'{SOURCE_KEY_PREFIX}{format_date_part(date)}' + + +def format_date_part(date: datetime.datetime) -> str: + """Converts a given date into the date part of an S3 object path. + + A returned string contains a trailing slash (/). + """ + return f'{date.year:04d}/{date.month:02d}/{date.day:02d}/' + + def lambda_handler(event, _): """Loads CloudFront access logs onto the data warehouse. @@ -511,11 +541,14 @@ def lambda_handler(event, _): LOGGER.debug('loading access logs: %s', str(event)) invocation_date = parse_time(event['time']) target_date = invocation_date - datetime.timedelta(days=1) - LOGGER.debug('loading access logs on:%s', str(target_date)) - res = redshift.get_credentials( - workgroupName=REDSHIFT_WORKGROUP_NAME, - dbName=ACCESS_LOGS_DATABASE_NAME, - ) - LOGGER.debug('accessing database as %s', res['dbUser']) - execute_load_script(target_date) + if has_access_logs(target_date): + LOGGER.debug('loading access logs on %s', str(target_date)) + res = redshift.get_credentials( + workgroupName=REDSHIFT_WORKGROUP_NAME, + dbName=ACCESS_LOGS_DATABASE_NAME, + ) + LOGGER.debug('accessing database as %s', res['dbUser']) + execute_load_script(target_date) + else: + LOGGER.debug('no access logs on %s', str(target_date)) return {} diff --git a/cdk-ops/lib/access-logs-etl.ts b/cdk-ops/lib/access-logs-etl.ts index 58bb537..2eafcb2 100644 --- a/cdk-ops/lib/access-logs-etl.ts +++ b/cdk-ops/lib/access-logs-etl.ts @@ -195,11 +195,11 @@ export class AccessLogsETL extends Construct { COPY_ROLE_ARN: dataWarehouse.namespaceRole.roleArn, }, timeout: Duration.minutes(15), + memorySize: 256, }, ); + this.outputAccessLogsBucket.grantRead(loadAccessLogsLambda); dataWarehouse.grantQuery(loadAccessLogsLambda); - // loadAccessLogsLambda does not need permissions to read - // outputAccessLogsBucket // TODO: schedule running loadAccessLogsLambda } }