Skip to content

Commit

Permalink
feat(cdk-ops): process S3 event via SQS
Browse files Browse the repository at this point in the history
- `lambda/mask-access-logs` now supposes that the input event is a list
  of SQS messages containing "ObjectCreated:*" S3 events.
- `AccessLogsMasking` provisions an SQS queue to capture
  "ObjectCreated:*" S3 events from the S3 bucket for access logs. It
  sets the SQS queue as an event source of `MaskAccessLogsLambda`.

issue codemonger-io#30
  • Loading branch information
kikuomax committed Sep 24, 2022
1 parent 827846f commit 714c370
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 6 deletions.
63 changes: 59 additions & 4 deletions cdk-ops/lambda/mask-access-logs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import gzip
import io
import ipaddress
import json
import logging
import os
import sys
Expand Down Expand Up @@ -123,17 +124,71 @@ def process_logs(logs_in: Iterator[str], logs_out: TextIO):

def lambda_handler(event, _):
"""Masks information in a given CloudFront access logs file on S3.
``event`` is supposed to be an SQS message event described at
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
"""
LOGGER.debug('masking access logs: %s', str(event))
src = source_bucket.Object(event['key'])
for record in event['Records']:
try:
message = json.loads(record['body'])
except json.JSONDecodeError:
LOGGER.error('invalid SQS record: %s', str(record))
continue
# may receive a test message "s3:TestEvent"
# and a test message does not have "Records"
entries = message.get('Records')
if entries is None:
LOGGER.debug('maybe a test message: %s', str(message))
continue
for entry in entries:
event_name = entry.get('eventName', '?')
if event_name.startswith('ObjectCreated:'):
s3_object = entry.get('s3')
if s3_object is not None:
process_s3_object(s3_object)
else:
LOGGER.error('invalid S3 event: %s', str(entry))
else:
LOGGER.error(
'event "%s" other than S3 object creation was notified.'
' please check the event source configuration',
event_name,
)
return {}


def process_s3_object(s3_object):
"""Processes a given S3 object event.
``s3_object`` must conform to an S3 object creation event described at
https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html
"""
LOGGER.debug('processing S3 object event: %s', str(s3_object))
# makes sure that the source bucket matches
bucket_name = s3_object.get('bucket', {}).get('name')
if bucket_name is None:
LOGGER.error('no bucket name in S3 object event: %s', str(s3_object))
return
if bucket_name != SOURCE_BUCKET_NAME:
LOGGER.warning(
'bucket name must be %s but %s was given.'
' please check the event source configuration',
SOURCE_BUCKET_NAME,
bucket_name,
)
return
key = s3_object.get('object', {}).get('key')
if key is None:
LOGGER.error('no object key in S3 object event: %s', str(s3_object))
return
src = source_bucket.Object(key)
results = src.get()
with open_body(results) as body:
with gzip.open(body, mode='rt') as tsv_in:
dest = destination_bucket.Object(event['key'])
dest = destination_bucket.Object(key)
with S3OutputStream(dest) as masked_out:
with gzip.open(masked_out, mode='wt') as tsv_out:
process_logs(tsv_in, tsv_out)
return {}


class S3OutputStream(io.RawIOBase):
Expand Down
55 changes: 53 additions & 2 deletions cdk-ops/lib/access-logs-masking.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import * as path from 'path';

import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import { RemovalPolicy, aws_lambda as lambda, aws_s3 as s3 } from 'aws-cdk-lib';
import {
Duration,
RemovalPolicy,
aws_lambda as lambda,
aws_lambda_event_sources as lambda_event,
aws_s3 as s3,
aws_s3_notifications as s3n,
aws_sqs as sqs,
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

import type { DeploymentStage } from 'cdk-common';
Expand All @@ -23,20 +31,28 @@ export class AccessLogsMasking extends Construct {

const { accessLogsBucket } = props;

// provisions an S3 bucket for masked access logs.
// S3 bucket for masked access logs.
this.maskedAccessLogsBucket = new s3.Bucket(
this,
'MaskedAccessLogsBucket',
{
blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
encryption: s3.BucketEncryption.S3_MANAGED,
enforceSSL: true,
lifecycleRules: [
{
// safeguard for incomplete multipart uploads.
// minimum resoluation is one day.
abortIncompleteMultipartUploadAfter: Duration.days(1),
},
],
removalPolicy: RemovalPolicy.RETAIN,
},
);

// Lambda functions
// - masks CloudFront access logs.
const maskAccessLogsLambdaTimeout = Duration.seconds(30);
const maskAccessLogsLambda = new PythonFunction(
this,
'MaskAccessLogsLambda',
Expand All @@ -50,9 +66,44 @@ export class AccessLogsMasking extends Construct {
SOURCE_BUCKET_NAME: accessLogsBucket.bucketName,
DESTINATION_BUCKET_NAME: this.maskedAccessLogsBucket.bucketName,
},
timeout: maskAccessLogsLambdaTimeout,
},
);
accessLogsBucket.grantRead(maskAccessLogsLambda);
this.maskedAccessLogsBucket.grantPut(maskAccessLogsLambda);

// SQS queue to capture creation of access logs files.
const maxBatchingWindow = Duration.minutes(5); // least frequency
const newLogsQueue = new sqs.Queue(this, 'NewLogsQueue', {
retentionPeriod: Duration.days(1),
// at least (6 * Lambda timeout) + (maximum batch window)
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-eventsource
visibilityTimeout: maxBatchingWindow.plus(
Duration.seconds(6 * maskAccessLogsLambdaTimeout.toSeconds()),
),
});
accessLogsBucket.addEventNotification(
s3.EventType.OBJECT_CREATED,
new s3n.SqsDestination(newLogsQueue),
);
// triggers MaskAccessLogsLambda when the SQS queue receives a message
maskAccessLogsLambda.addEventSource(
new lambda_event.SqsEventSource(newLogsQueue, {
enabled: true,
batchSize: 10,
maxBatchingWindow,
// the following filter did not work as I intended, and I gave up.
/*
filters: [
// SQS queue may receive a test message "s3:TestEvent".
// non-test message must contain the "Records" field.
lambda.FilterCriteria.filter({
body: {
Records: lambda.FilterRule.exists(),
},
}),
], */
}),
);
}
}

0 comments on commit 714c370

Please sign in to comment.