diff --git a/cdk-ops/lambda/delete-access-logs/index.py b/cdk-ops/lambda/delete-access-logs/index.py index f4aeb2e..c45f02e 100644 --- a/cdk-ops/lambda/delete-access-logs/index.py +++ b/cdk-ops/lambda/delete-access-logs/index.py @@ -16,6 +16,7 @@ import logging import os import boto3 +from botocore.exceptions import ClientError SOURCE_BUCKET_NAME = os.environ['SOURCE_BUCKET_NAME'] @@ -103,7 +104,14 @@ def process_s3_object(s3_object): DESTINATION_KEY_PREFIX, ) return - key = key[len(DESTINATION_KEY_PREFIX):] - src = source_bucket.Object(key) - res = src.delete() - LOGGER.debug('deleted object "%s": %s', key, str(res)) + # key should be like, + # {DESTINATION_KEY_PREFIX}{year}/{month}/{date}/{original_key} + # so the last segment separated by a slash ('/') is the key for the + # original access logs file. + src_key = key.split('/')[-1] + src = source_bucket.Object(src_key) + try: + res = src.delete() + LOGGER.debug('deleted object "%s": %s', src_key, str(res)) + except ClientError as exc: + LOGGER.error('failed to delete object "%s": %s', src_key, str(exc)) diff --git a/cdk-ops/lambda/mask-access-logs/index.py b/cdk-ops/lambda/mask-access-logs/index.py index 2117010..dae4ce5 100644 --- a/cdk-ops/lambda/mask-access-logs/index.py +++ b/cdk-ops/lambda/mask-access-logs/index.py @@ -19,10 +19,11 @@ import json import logging import os -import sys +import time from contextlib import contextmanager -from typing import Dict, Iterable, Iterator, TextIO +from typing import Dict, Iterable, Iterator, Sequence, TextIO import boto3 +from botocore.exceptions import ClientError SOURCE_BUCKET_NAME = os.environ['SOURCE_BUCKET_NAME'] @@ -107,7 +108,7 @@ def mask_ip_address_v6(addr: str) -> str: return str(net.network_address) -def process_logs(logs_in: Iterator[str], logs_out: TextIO): +def process_logs(src_key: str, logs_in: Iterator[str]): """Processes given CloudFront logs and outputs to given stream. """ tsv_in = csv.DictReader(translate_logs(logs_in), delimiter='\t') @@ -116,15 +117,10 @@ def process_logs(logs_in: Iterator[str], logs_out: TextIO): column_names = tsv_in.fieldnames if column_names is None: raise ValueError('no field names are specified in the input') - tsv_out = csv.DictWriter( - logs_out, - fieldnames=column_names, - delimiter='\t', - ) - tsv_out.writeheader() - for row in tsv_in: - row = mask_row(row) - tsv_out.writerow(row) + with LogDispatcher(src_key, column_names) as dispatcher: + for row in tsv_in: + row = mask_row(row) + dispatcher.writerow(row) def lambda_handler(event, _): @@ -138,7 +134,12 @@ def lambda_handler(event, _): This handler masks information in the given S3 objects and stores masked results into the S3 bucket specified by ``DESTINATION_BUCKET_NAME`` with - the same object key but with ``DESTINATION_KEY_PREFIX`` prefixed. + the same object key but with ``DESTINATION_KEY_PREFIX``, year, month, and + date prefixed. + + ``{DESTINATION_KEY_PREFIX}{year}/{month}/{date}/{key}`` + + where ``year``, ``month``, and ``date`` are the timestamp of a log record. """ for record in event['Records']: try: @@ -201,10 +202,7 @@ def process_s3_object(s3_object): return with open_body(results) as body: with gzip.open(body, mode='rt') as tsv_in: - dest = destination_bucket.Object(f'{DESTINATION_KEY_PREFIX}{key}') - with S3OutputStream(dest) as masked_out: - with gzip.open(masked_out, mode='wt') as tsv_out: - process_logs(tsv_in, tsv_out) + process_logs(key, tsv_in) class S3OutputStream(io.RawIOBase): @@ -296,8 +294,10 @@ def abort(self): """ if self.multipart_upload is not None: LOGGER.debug('aborting the multipart upload') - self.multipart_upload.abort() - self.multipart_upload = None + try: + self.multipart_upload.abort() + finally: + self.multipart_upload = None def __exit__(self, exc_type, exc_value, traceback): @@ -318,6 +318,146 @@ def __del__(self): self.abort() +class GzippedTsvOnS3: + """Gzipped TSV file in an S3 bucket. + """ + + underlying: S3OutputStream + gzipped: TextIO + tsv_writer: csv.DictWriter + + + def __init__( + self, + underlying: S3OutputStream, + gzipped: TextIO, + tsv_writer: csv.DictWriter, + ): + self.underlying = underlying + self.gzipped = gzipped + self.tsv_writer = tsv_writer + + + def close(self): + """Completes the upload of the CSV file. + """ + try: + self.gzipped.close() + except IOError as exc: + LOGGER.error('failed to close a gzip stream: %s', str(exc)) + self.underlying.abort() + else: + try: + self.underlying.close() + except ClientError as exc: + LOGGER.error( + 'failed to finish an S3 object upload: %s', + str(exc), + ) + + + def abort(self): + """Aborts the upload of the CSV file. + """ + try: + self.gzipped.close() + except IOError as exc: + LOGGER.error('failed to close a gzip stream: %s', str(exc)) + try: + self.underlying.abort() + except ClientError as exc: + # TODO: possible exceptions? + LOGGER.error( + 'failed to abort an S3 object upload: %s', + str(exc), + ) + + +class LogDispatcher: + """Distributes access log records to S3 objects corresponding to their + dates. + + You should wrap this object in a ``with`` statement. + """ + + LOG_DATE_FORMAT = '%Y-%m-%d' + + dest_map: Dict[time.struct_time, GzippedTsvOnS3] + + + def __init__(self, src_key: str, column_names: Sequence[str]): + """Initializes with the column names. + """ + self.src_key = src_key + self.column_names = column_names + self.dest_map = {} + + + def writerow(self, row: Dict[str, str]): + """Writes a given row into a matching S3 object. + + Ignores an invalid row. + """ + try: + date = time.strptime(row['date'], LogDispatcher.LOG_DATE_FORMAT) + except KeyError: + LOGGER.warning('log record must have date: %s', str(row)) + except ValueError: + LOGGER.warning('invalid date format: %s', row['date']) + else: + dest = self.get_destination(date) + dest.writerow(row) + + + def get_destination(self, date: time.struct_time) -> csv.DictWriter: + """Obtains the output stream corresponding to a given date. + + Opens a new ``S3OutputStream`` if none has been opened yet. + """ + if date in self.dest_map: + return self.dest_map[date].tsv_writer + year = f'{date.tm_year:04d}' + month = f'{date.tm_mon:02d}' + mday = f'{date.tm_mday:02d}' + key = f'{DESTINATION_KEY_PREFIX}{year}/{month}/{mday}/{self.src_key}' + dest_stream = S3OutputStream(destination_bucket.Object(key)) + dest_gzip = gzip.open(dest_stream, mode='wt') + dest_tsv = csv.DictWriter( + dest_gzip, + fieldnames=self.column_names, + delimiter='\t', + ) + self.dest_map[date] = GzippedTsvOnS3(dest_stream, dest_gzip, dest_tsv) + dest_tsv.writeheader() + return dest_tsv + + + def close(self): + """Completes log dispatch and S3 object uploads. + """ + for dest in self.dest_map.values(): + dest.close() + + + def abort(self): + """Aborts log dispatch and S3 object uploads. + """ + for dest in self.dest_map.values(): + dest.abort() + + + def __enter__(self): + return self + + + def __exit__(self, exc_type, _exc_val, _exc_tb): + if exc_type is None: + self.close() + else: + self.abort() + return False + + @contextmanager def open_body(s3_get_results): """Enables ``with`` statement for a body got from an S3 bucket. @@ -327,33 +467,3 @@ def open_body(s3_get_results): yield body finally: body.close() - - -if __name__ == '__main__': - import argparse - arg_parser = argparse.ArgumentParser( - description='Masks CloudFront access logs', - ) - arg_parser.add_argument( - 'logs_path', - metavar='LOGS', - type=str, - help='path to a gzipped TSV file containing CloudFront access logs', - ) - arg_parser.add_argument( - '--out', - dest='out_path', - metavar='OUT', - type=str, - help='path to a file where masked CloudFront access logs are to be' - ' saved (gzipped)', - ) - logging.basicConfig(level=logging.DEBUG) - LOGGER.debug('filtering access logs') - args = arg_parser.parse_args() - if args.out_path is not None: - results_out = gzip.open(args.out_path, mode='wt') - else: - results_out = sys.stdout - with gzip.open(args.logs_path, mode='rt') as text_in: - process_logs(text_in, results_out)