Skip to content

Commit

Permalink
feat(cdk-ops): prefix date
Browse files Browse the repository at this point in the history
- `lambda/mask-access-logs` prefixes the date of access log records to
  the output S3 object key. `lambda/delete-access-logs` excludes
  prefixed dates to locate the original access logs file.

issue codemonger-io#30
  • Loading branch information
kikuomax committed Sep 26, 2022
1 parent 1fffb18 commit 58327f3
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 53 deletions.
16 changes: 12 additions & 4 deletions cdk-ops/lambda/delete-access-logs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import logging
import os
import boto3
from botocore.exceptions import ClientError


SOURCE_BUCKET_NAME = os.environ['SOURCE_BUCKET_NAME']
Expand Down Expand Up @@ -103,7 +104,14 @@ def process_s3_object(s3_object):
DESTINATION_KEY_PREFIX,
)
return
key = key[len(DESTINATION_KEY_PREFIX):]
src = source_bucket.Object(key)
res = src.delete()
LOGGER.debug('deleted object "%s": %s', key, str(res))
# key should be like,
# {DESTINATION_KEY_PREFIX}{year}/{month}/{date}/{original_key}
# so the last segment separated by a slash ('/') is the key for the
# original access logs file.
src_key = key.split('/')[-1]
src = source_bucket.Object(src_key)
try:
res = src.delete()
LOGGER.debug('deleted object "%s": %s', src_key, str(res))
except ClientError as exc:
LOGGER.error('failed to delete object "%s": %s', src_key, str(exc))
208 changes: 159 additions & 49 deletions cdk-ops/lambda/mask-access-logs/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import json
import logging
import os
import sys
import time
from contextlib import contextmanager
from typing import Dict, Iterable, Iterator, TextIO
from typing import Dict, Iterable, Iterator, Sequence, TextIO
import boto3
from botocore.exceptions import ClientError


SOURCE_BUCKET_NAME = os.environ['SOURCE_BUCKET_NAME']
Expand Down Expand Up @@ -107,7 +108,7 @@ def mask_ip_address_v6(addr: str) -> str:
return str(net.network_address)


def process_logs(logs_in: Iterator[str], logs_out: TextIO):
def process_logs(src_key: str, logs_in: Iterator[str]):
"""Processes given CloudFront logs and outputs to given stream.
"""
tsv_in = csv.DictReader(translate_logs(logs_in), delimiter='\t')
Expand All @@ -116,15 +117,10 @@ def process_logs(logs_in: Iterator[str], logs_out: TextIO):
column_names = tsv_in.fieldnames
if column_names is None:
raise ValueError('no field names are specified in the input')
tsv_out = csv.DictWriter(
logs_out,
fieldnames=column_names,
delimiter='\t',
)
tsv_out.writeheader()
for row in tsv_in:
row = mask_row(row)
tsv_out.writerow(row)
with LogDispatcher(src_key, column_names) as dispatcher:
for row in tsv_in:
row = mask_row(row)
dispatcher.writerow(row)


def lambda_handler(event, _):
Expand All @@ -138,7 +134,12 @@ def lambda_handler(event, _):
This handler masks information in the given S3 objects and stores masked
results into the S3 bucket specified by ``DESTINATION_BUCKET_NAME`` with
the same object key but with ``DESTINATION_KEY_PREFIX`` prefixed.
the same object key but with ``DESTINATION_KEY_PREFIX``, year, month, and
date prefixed.
``{DESTINATION_KEY_PREFIX}{year}/{month}/{date}/{key}``
where ``year``, ``month``, and ``date`` are the timestamp of a log record.
"""
for record in event['Records']:
try:
Expand Down Expand Up @@ -201,10 +202,7 @@ def process_s3_object(s3_object):
return
with open_body(results) as body:
with gzip.open(body, mode='rt') as tsv_in:
dest = destination_bucket.Object(f'{DESTINATION_KEY_PREFIX}{key}')
with S3OutputStream(dest) as masked_out:
with gzip.open(masked_out, mode='wt') as tsv_out:
process_logs(tsv_in, tsv_out)
process_logs(key, tsv_in)


class S3OutputStream(io.RawIOBase):
Expand Down Expand Up @@ -296,8 +294,10 @@ def abort(self):
"""
if self.multipart_upload is not None:
LOGGER.debug('aborting the multipart upload')
self.multipart_upload.abort()
self.multipart_upload = None
try:
self.multipart_upload.abort()
finally:
self.multipart_upload = None


def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -318,6 +318,146 @@ def __del__(self):
self.abort()


class GzippedTsvOnS3:
"""Gzipped TSV file in an S3 bucket.
"""

underlying: S3OutputStream
gzipped: TextIO
tsv_writer: csv.DictWriter


def __init__(
self,
underlying: S3OutputStream,
gzipped: TextIO,
tsv_writer: csv.DictWriter,
):
self.underlying = underlying
self.gzipped = gzipped
self.tsv_writer = tsv_writer


def close(self):
"""Completes the upload of the CSV file.
"""
try:
self.gzipped.close()
except IOError as exc:
LOGGER.error('failed to close a gzip stream: %s', str(exc))
self.underlying.abort()
else:
try:
self.underlying.close()
except ClientError as exc:
LOGGER.error(
'failed to finish an S3 object upload: %s',
str(exc),
)


def abort(self):
"""Aborts the upload of the CSV file.
"""
try:
self.gzipped.close()
except IOError as exc:
LOGGER.error('failed to close a gzip stream: %s', str(exc))
try:
self.underlying.abort()
except ClientError as exc:
# TODO: possible exceptions?
LOGGER.error(
'failed to abort an S3 object upload: %s',
str(exc),
)


class LogDispatcher:
"""Distributes access log records to S3 objects corresponding to their
dates.
You should wrap this object in a ``with`` statement.
"""

LOG_DATE_FORMAT = '%Y-%m-%d'

dest_map: Dict[time.struct_time, GzippedTsvOnS3]


def __init__(self, src_key: str, column_names: Sequence[str]):
"""Initializes with the column names.
"""
self.src_key = src_key
self.column_names = column_names
self.dest_map = {}


def writerow(self, row: Dict[str, str]):
"""Writes a given row into a matching S3 object.
Ignores an invalid row.
"""
try:
date = time.strptime(row['date'], LogDispatcher.LOG_DATE_FORMAT)
except KeyError:
LOGGER.warning('log record must have date: %s', str(row))
except ValueError:
LOGGER.warning('invalid date format: %s', row['date'])
else:
dest = self.get_destination(date)
dest.writerow(row)


def get_destination(self, date: time.struct_time) -> csv.DictWriter:
"""Obtains the output stream corresponding to a given date.
Opens a new ``S3OutputStream`` if none has been opened yet.
"""
if date in self.dest_map:
return self.dest_map[date].tsv_writer
year = f'{date.tm_year:04d}'
month = f'{date.tm_mon:02d}'
mday = f'{date.tm_mday:02d}'
key = f'{DESTINATION_KEY_PREFIX}{year}/{month}/{mday}/{self.src_key}'
dest_stream = S3OutputStream(destination_bucket.Object(key))
dest_gzip = gzip.open(dest_stream, mode='wt')
dest_tsv = csv.DictWriter(
dest_gzip,
fieldnames=self.column_names,
delimiter='\t',
)
self.dest_map[date] = GzippedTsvOnS3(dest_stream, dest_gzip, dest_tsv)
dest_tsv.writeheader()
return dest_tsv


def close(self):
"""Completes log dispatch and S3 object uploads.
"""
for dest in self.dest_map.values():
dest.close()


def abort(self):
"""Aborts log dispatch and S3 object uploads.
"""
for dest in self.dest_map.values():
dest.abort()


def __enter__(self):
return self


def __exit__(self, exc_type, _exc_val, _exc_tb):
if exc_type is None:
self.close()
else:
self.abort()
return False


@contextmanager
def open_body(s3_get_results):
"""Enables ``with`` statement for a body got from an S3 bucket.
Expand All @@ -327,33 +467,3 @@ def open_body(s3_get_results):
yield body
finally:
body.close()


if __name__ == '__main__':
import argparse
arg_parser = argparse.ArgumentParser(
description='Masks CloudFront access logs',
)
arg_parser.add_argument(
'logs_path',
metavar='LOGS',
type=str,
help='path to a gzipped TSV file containing CloudFront access logs',
)
arg_parser.add_argument(
'--out',
dest='out_path',
metavar='OUT',
type=str,
help='path to a file where masked CloudFront access logs are to be'
' saved (gzipped)',
)
logging.basicConfig(level=logging.DEBUG)
LOGGER.debug('filtering access logs')
args = arg_parser.parse_args()
if args.out_path is not None:
results_out = gzip.open(args.out_path, mode='wt')
else:
results_out = sys.stdout
with gzip.open(args.logs_path, mode='rt') as text_in:
process_logs(text_in, results_out)

0 comments on commit 58327f3

Please sign in to comment.