From 714c370330c7b88ba4eb2864dda4bdaeb4660c5e Mon Sep 17 00:00:00 2001 From: Kikuo Emoto Date: Wed, 21 Sep 2022 18:34:41 +0900 Subject: [PATCH] feat(cdk-ops): process S3 event via SQS - `lambda/mask-access-logs` now supposes that the input event is a list of SQS messages containing "ObjectCreated:*" S3 events. - `AccessLogsMasking` provisions an SQS queue to capture "ObjectCreated:*" S3 events from the S3 bucket for access logs. It sets the SQS queue as an event source of `MaskAccessLogsLambda`. issue codemonger-io/codemonger#30 --- cdk-ops/lambda/mask-access-logs/index.py | 63 ++++++++++++++++++++++-- cdk-ops/lib/access-logs-masking.ts | 55 ++++++++++++++++++++- 2 files changed, 112 insertions(+), 6 deletions(-) diff --git a/cdk-ops/lambda/mask-access-logs/index.py b/cdk-ops/lambda/mask-access-logs/index.py index 012cc1e..67c4fbc 100644 --- a/cdk-ops/lambda/mask-access-logs/index.py +++ b/cdk-ops/lambda/mask-access-logs/index.py @@ -12,6 +12,7 @@ import gzip import io import ipaddress +import json import logging import os import sys @@ -123,17 +124,71 @@ def process_logs(logs_in: Iterator[str], logs_out: TextIO): def lambda_handler(event, _): """Masks information in a given CloudFront access logs file on S3. + + ``event`` is supposed to be an SQS message event described at + https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html """ - LOGGER.debug('masking access logs: %s', str(event)) - src = source_bucket.Object(event['key']) + for record in event['Records']: + try: + message = json.loads(record['body']) + except json.JSONDecodeError: + LOGGER.error('invalid SQS record: %s', str(record)) + continue + # may receive a test message "s3:TestEvent" + # and a test message does not have "Records" + entries = message.get('Records') + if entries is None: + LOGGER.debug('maybe a test message: %s', str(message)) + continue + for entry in entries: + event_name = entry.get('eventName', '?') + if event_name.startswith('ObjectCreated:'): + s3_object = entry.get('s3') + if s3_object is not None: + process_s3_object(s3_object) + else: + LOGGER.error('invalid S3 event: %s', str(entry)) + else: + LOGGER.error( + 'event "%s" other than S3 object creation was notified.' + ' please check the event source configuration', + event_name, + ) + return {} + + +def process_s3_object(s3_object): + """Processes a given S3 object event. + + ``s3_object`` must conform to an S3 object creation event described at + https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html + """ + LOGGER.debug('processing S3 object event: %s', str(s3_object)) + # makes sure that the source bucket matches + bucket_name = s3_object.get('bucket', {}).get('name') + if bucket_name is None: + LOGGER.error('no bucket name in S3 object event: %s', str(s3_object)) + return + if bucket_name != SOURCE_BUCKET_NAME: + LOGGER.warning( + 'bucket name must be %s but %s was given.' + ' please check the event source configuration', + SOURCE_BUCKET_NAME, + bucket_name, + ) + return + key = s3_object.get('object', {}).get('key') + if key is None: + LOGGER.error('no object key in S3 object event: %s', str(s3_object)) + return + src = source_bucket.Object(key) results = src.get() with open_body(results) as body: with gzip.open(body, mode='rt') as tsv_in: - dest = destination_bucket.Object(event['key']) + dest = destination_bucket.Object(key) with S3OutputStream(dest) as masked_out: with gzip.open(masked_out, mode='wt') as tsv_out: process_logs(tsv_in, tsv_out) - return {} class S3OutputStream(io.RawIOBase): diff --git a/cdk-ops/lib/access-logs-masking.ts b/cdk-ops/lib/access-logs-masking.ts index cdbb71a..cb11aaf 100644 --- a/cdk-ops/lib/access-logs-masking.ts +++ b/cdk-ops/lib/access-logs-masking.ts @@ -1,7 +1,15 @@ import * as path from 'path'; import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; -import { RemovalPolicy, aws_lambda as lambda, aws_s3 as s3 } from 'aws-cdk-lib'; +import { + Duration, + RemovalPolicy, + aws_lambda as lambda, + aws_lambda_event_sources as lambda_event, + aws_s3 as s3, + aws_s3_notifications as s3n, + aws_sqs as sqs, +} from 'aws-cdk-lib'; import { Construct } from 'constructs'; import type { DeploymentStage } from 'cdk-common'; @@ -23,7 +31,7 @@ export class AccessLogsMasking extends Construct { const { accessLogsBucket } = props; - // provisions an S3 bucket for masked access logs. + // S3 bucket for masked access logs. this.maskedAccessLogsBucket = new s3.Bucket( this, 'MaskedAccessLogsBucket', @@ -31,12 +39,20 @@ export class AccessLogsMasking extends Construct { blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, encryption: s3.BucketEncryption.S3_MANAGED, enforceSSL: true, + lifecycleRules: [ + { + // safeguard for incomplete multipart uploads. + // minimum resoluation is one day. + abortIncompleteMultipartUploadAfter: Duration.days(1), + }, + ], removalPolicy: RemovalPolicy.RETAIN, }, ); // Lambda functions // - masks CloudFront access logs. + const maskAccessLogsLambdaTimeout = Duration.seconds(30); const maskAccessLogsLambda = new PythonFunction( this, 'MaskAccessLogsLambda', @@ -50,9 +66,44 @@ export class AccessLogsMasking extends Construct { SOURCE_BUCKET_NAME: accessLogsBucket.bucketName, DESTINATION_BUCKET_NAME: this.maskedAccessLogsBucket.bucketName, }, + timeout: maskAccessLogsLambdaTimeout, }, ); accessLogsBucket.grantRead(maskAccessLogsLambda); this.maskedAccessLogsBucket.grantPut(maskAccessLogsLambda); + + // SQS queue to capture creation of access logs files. + const maxBatchingWindow = Duration.minutes(5); // least frequency + const newLogsQueue = new sqs.Queue(this, 'NewLogsQueue', { + retentionPeriod: Duration.days(1), + // at least (6 * Lambda timeout) + (maximum batch window) + // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-eventsource + visibilityTimeout: maxBatchingWindow.plus( + Duration.seconds(6 * maskAccessLogsLambdaTimeout.toSeconds()), + ), + }); + accessLogsBucket.addEventNotification( + s3.EventType.OBJECT_CREATED, + new s3n.SqsDestination(newLogsQueue), + ); + // triggers MaskAccessLogsLambda when the SQS queue receives a message + maskAccessLogsLambda.addEventSource( + new lambda_event.SqsEventSource(newLogsQueue, { + enabled: true, + batchSize: 10, + maxBatchingWindow, + // the following filter did not work as I intended, and I gave up. + /* + filters: [ + // SQS queue may receive a test message "s3:TestEvent". + // non-test message must contain the "Records" field. + lambda.FilterCriteria.filter({ + body: { + Records: lambda.FilterRule.exists(), + }, + }), + ], */ + }), + ); } }