From a9796ab2647b815118a081041349893eff62803b Mon Sep 17 00:00:00 2001 From: Kikuo Emoto Date: Sat, 8 Oct 2022 12:52:51 +0900 Subject: [PATCH] feat(cdk-ops): load access logs - Introduces a new Lambda function `lambda/load-access-logs` that loads CloudFront access logs from the S3 bucket onto the data warehouse. `AccessLogsETL` provisions this function. - `DataWarehouse` introduces a new method `grantQuery` that allows a given `IGrantable` to call the Redshift Data API. The permission is too permissive because I could not figure out how to obtain the ARN of the Redshift Serverless namespace. `DataWarehouse` exports the Redshift Serverless workgroup so that `AccessLogsETL` can configure `lambda/load-access-logs`. - `CdkOpsStack` passes `DataWarehouse`, `LatestBoto3Layer`, and `LibdatawarehouseLayer` to `AccessLogsETL`. issue codemonger-io/codemonger#30 --- cdk-ops/lambda/load-access-logs/index.py | 516 +++++++++++++++++++++++ cdk-ops/lib/access-logs-etl.ts | 62 ++- cdk-ops/lib/cdk-ops-stack.ts | 3 + cdk-ops/lib/data-warehouse.ts | 50 ++- 4 files changed, 617 insertions(+), 14 deletions(-) create mode 100644 cdk-ops/lambda/load-access-logs/index.py diff --git a/cdk-ops/lambda/load-access-logs/index.py b/cdk-ops/lambda/load-access-logs/index.py new file mode 100644 index 0000000..4192ce3 --- /dev/null +++ b/cdk-ops/lambda/load-access-logs/index.py @@ -0,0 +1,516 @@ +# -*- coding: utf-8 -*- + +"""Loads CloudFront access logs onto the data warehouse. + +You have to specify the following environment variables, +* ``SOURCE_BUCKET_NAME``: name of the S3 bucket containing access logs to be + loaded. +* ``SOURCE_OBJECT_KEY_PREFIX``: prefix of the S3 object keys to be loaded. +* ``REDSHIFT_WORKGROUP_NAME``: name of the Redshift Serverless workgroup. +* ``COPY_ROLE_ARN``: ARN of the IAM role to COPY data from the S3 object. +""" + +import datetime +import logging +import os +import boto3 +from libdatawarehouse import ACCESS_LOGS_DATABASE_NAME, data_api, tables +from libdatawarehouse.exceptions import DataWarehouseException + + +SOURCE_BUCKET_NAME = os.environ['SOURCE_BUCKET_NAME'] +SOURCE_KEY_PREFIX = os.environ['SOURCE_KEY_PREFIX'] +REDSHIFT_WORKGROUP_NAME = os.environ['REDSHIFT_WORKGROUP_NAME'] +COPY_ROLE_ARN = os.environ['COPY_ROLE_ARN'] + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + +redshift = boto3.client('redshift-serverless') +redshift_data = boto3.client('redshift-data') + + +def execute_load_script(date: datetime.datetime): + """Executes the script to load CloudFront access logs. + + :param datetime.datetime date: date on which CloudFront access logs are to + be loaded. + """ + batch_res = redshift_data.batch_execute_statement( + WorkgroupName=REDSHIFT_WORKGROUP_NAME, + Database=ACCESS_LOGS_DATABASE_NAME, + Sqls=[ + # drops remaining temporary tables just in case + get_drop_raw_access_log_table_statement(), + get_drop_referer_stage_table_statement(), + get_drop_page_stage_table_statement(), + get_drop_edge_location_stage_table_statement(), + get_drop_user_agent_stage_table_statement(), + get_drop_access_log_stage_2_table_statement(), + get_drop_access_log_stage_table_statement(), + + get_create_raw_access_log_table_statement(), + get_load_access_logs_statement(date), + get_create_access_log_stage_table_statement(), + get_drop_raw_access_log_table_statement(), + get_create_referer_stage_table_statement(), + get_delete_existing_referers_statement(), + get_insert_referers_statement(), + get_drop_referer_stage_table_statement(), + get_create_page_stage_table_statement(), + get_delete_existing_pages_statement(), + get_insert_pages_statement(), + get_drop_page_stage_table_statement(), + get_create_edge_location_stage_table_statement(), + get_delete_existing_edge_locations_statement(), + get_insert_edge_locations_statement(), + get_drop_edge_location_stage_table_statement(), + get_create_user_agent_stage_table_statement(), + get_delete_existing_user_agents_statement(), + get_insert_user_agents_statement(), + get_drop_user_agent_stage_table_statement(), + get_create_result_type_stage_table_statement(), + get_delete_existing_result_types_statement(), + get_insert_result_types_statement(), + get_drop_result_type_stage_table_statement(), + get_encode_foreign_keys_statement(), + get_insert_access_logs_statement(), + get_drop_access_log_stage_2_table_statement(), + get_drop_access_log_stage_table_statement(), + ], + ) + statement_id = batch_res['Id'] + status, res = data_api.wait_for_results(redshift_data, statement_id) + if status != 'FINISHED': + if status is not None: + if status == 'FAILED': + LOGGER.error('failed to load access logs: %s', res.get('Error')) + raise DataWarehouseException( + f'failed to load access logs: {status}', + ) + raise DataWarehouseException('loading access logs timed out') + LOGGER.debug( + 'loaded access logs in %.3f ms', + res.get('Duration', 0) * 0.001 * 0.001, # ns → ms + ) + + +def get_create_raw_access_log_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to load raw + access logs from the S3 bucket. + """ + return ''.join([ + 'CREATE TABLE #raw_access_log (', + ' date DATE,', + ' time TIME,', + ' edge_location VARCHAR,', + ' sc_bytes BIGINT,', + ' c_ip VARCHAR,', + ' cs_method VARCHAR,', + ' cs_host VARCHAR,', + ' cs_uri_stem VARCHAR(2048),', + ' status SMALLINT,', + ' referer VARCHAR(2048),', + ' user_agent VARCHAR(2048),', + ' cs_uri_query VARCHAR,', + ' cs_cookie VARCHAR,', + ' edge_result_type VARCHAR,', + ' edge_request_id VARCHAR,', + ' host_header VARCHAR,', + ' cs_protocol VARCHAR,', + ' cs_bytes BIGINT,', + ' time_taken FLOAT4,', + ' forwarded_for VARCHAR,', + ' ssl_protocol VARCHAR,', + ' ssl_cipher VARCHAR,', + ' edge_response_result_type VARCHAR,', + ' cs_protocol_version VARCHAR,', + ' fle_status VARCHAR,', + ' fle_encrypted_fields VARCHAR,', + ' c_port INT,', + ' time_to_first_byte FLOAT4,', + ' edge_detailed_result_type VARCHAR,', + ' sc_content_type VARCHAR,', + ' sc_content_len BIGINT,', + ' sc_range_start BIGINT,', + ' sc_range_end BIGINT', + ')', + 'SORTKEY (date, time)', + ]) + + +def get_load_access_logs_statement(date: datetime.datetime) -> str: + """Returns an SQL statement that loads access logs from the S3 bucket. + """ + date_part = f'{date.year:04d}/{date.month:02d}/{date.day:02d}/' + return ''.join([ + 'COPY #raw_access_log', + f" FROM 's3://{SOURCE_BUCKET_NAME}/{SOURCE_KEY_PREFIX}{date_part}'", + f" IAM_ROLE '{COPY_ROLE_ARN}'", + ' GZIP', + " DELIMITER '\t'", + ' IGNOREHEADER 1', + " NULL AS '-'", + ]) + + +def get_create_access_log_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to select and + format access log columns. + """ + return ''.join([ + 'CREATE TABLE #access_log_stage (', + ' datetime,', + ' edge_location,', + ' sc_bytes,', + ' cs_method,', + ' cs_uri_stem,', + ' status,', + ' referer,', + ' user_agent,', + ' cs_protocol,', + ' cs_bytes,', + ' time_taken,', + ' edge_response_result_type,', + ' time_to_first_byte', + ')', + ' SORTKEY (datetime)', + ' AS SELECT', + ' ("date" || \' \' || "time")::TIMESTAMP,', + ' edge_location,', + ' sc_bytes,', + ' cs_method,', + ' cs_uri_stem,', + ' status,', + " CASE WHEN referer IS NULL THEN '-' ELSE referer END,", + ' user_agent,', + ' cs_protocol,', + ' cs_bytes,', + ' time_taken,', + ' edge_response_result_type,', + ' time_to_first_byte', + ' FROM #raw_access_log', + ]) + + +def get_drop_raw_access_log_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to load raw + access logs from the S3 bucket. + """ + return get_drop_table_statement('#raw_access_log') + + +def get_create_referer_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to aggregate + referers. + """ + return ''.join([ + 'CREATE TABLE #referer_stage (url)', + ' SORTKEY (url)', + ' AS SELECT referer FROM #access_log_stage', + ]) + + +def get_delete_existing_referers_statement() -> str: + """Returns an SQL statement that deletes existing referers from the + temporary referer table. + """ + return ''.join([ + 'DELETE FROM #referer_stage', + f' USING {tables.REFERER_TABLE_NAME}', + ' WHERE', + f' #referer_stage.url = {tables.REFERER_TABLE_NAME}.url', + ]) + + +def get_insert_referers_statement() -> str: + """Returns an SQL statement that inserts new referers in the temporary + table into the referer table. + """ + return ''.join([ + f'INSERT INTO {tables.REFERER_TABLE_NAME} (url)', + ' SELECT url FROM #referer_stage GROUP BY url', + ]) + + +def get_drop_referer_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to aggregate + referers. + """ + return get_drop_table_statement('#referer_stage') + + +def get_create_page_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to aggregate + pages. + """ + return ''.join([ + 'CREATE TABLE #page_stage (path)', + ' SORTKEY (path)', + ' AS SELECT cs_uri_stem FROM #access_log_stage', + ]) + + +def get_delete_existing_pages_statement() -> str: + """Returns an SQL statement that deletes existing pages from the temporary + page table. + """ + return ''.join([ + 'DELETE FROM #page_stage', + f' USING {tables.PAGE_TABLE_NAME}', + ' WHERE', + f' #page_stage.path = {tables.PAGE_TABLE_NAME}.path', + ]) + + +def get_insert_pages_statement() -> str: + """Returns an SQL statement that inserts new pages in the temporary table + into the stage table. + """ + return ''.join([ + f'INSERT INTO {tables.PAGE_TABLE_NAME} (path)', + ' SELECT path FROM #page_stage GROUP BY path', + ]) + + +def get_drop_page_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to aggregate + pages. + """ + return get_drop_table_statement('#page_stage') + + +def get_create_edge_location_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to aggregate edge + locations. + """ + return ''.join([ + 'CREATE TABLE #edge_location_stage (code)', + ' SORTKEY (code)', + ' AS SELECT edge_location FROM #access_log_stage', + ]) + + +def get_delete_existing_edge_locations_statement() -> str: + """Returns an SQL statement that deletes existing edge locations from the + tempoary edge location table. + """ + return ''.join([ + 'DELETE FROM #edge_location_stage', + f' USING {tables.EDGE_LOCATION_TABLE_NAME}', + ' WHERE', + f' #edge_location_stage.code = {tables.EDGE_LOCATION_TABLE_NAME}.code', + ]) + + +def get_insert_edge_locations_statement() -> str: + """Returns an SQL statement that inserts new edge locations in the temporary + table into the edge location table. + """ + return ''.join([ + f'INSERT INTO {tables.EDGE_LOCATION_TABLE_NAME} (code)', + ' SELECT code FROM #edge_location_stage GROUP BY code', + ]) + + +def get_drop_edge_location_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to aggregate edge + locations. + """ + return get_drop_table_statement('#edge_location_stage') + + +def get_create_user_agent_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to aggregate user + agents. + """ + return ''.join([ + 'CREATE TABLE #user_agent_stage (user_agent)', + ' SORTKEY (user_agent)', + ' AS SELECT user_agent FROM #access_log_stage', + ]) + + +def get_delete_existing_user_agents_statement() -> str: + """Returns an SQL statement that deletes existing user agents from the + temporary user agent table. + """ + return ''.join([ + 'DELETE FROM #user_agent_stage', + f' USING {tables.USER_AGENT_TABLE_NAME}', + ' WHERE', + f' #user_agent_stage.user_agent = {tables.USER_AGENT_TABLE_NAME}.user_agent', + ]) + + +def get_insert_user_agents_statement() -> str: + """Returns an SQL statement that inserts user agents in the temporary table + into the user agent table. + """ + return ''.join([ + f'INSERT INTO {tables.USER_AGENT_TABLE_NAME} (user_agent)', + ' SELECT user_agent FROM #user_agent_stage GROUP BY user_agent', + ]) + + +def get_drop_user_agent_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to aggregate user + agents. + """ + return get_drop_table_statement('#user_agent_stage') + + +def get_create_result_type_stage_table_statement() -> str: + """Returns an SQL statement that creates a temporary table to aggregate + result types. + """ + return ''.join([ + 'CREATE TABLE #result_type_stage (result_type)', + ' SORTKEY (result_type)', + ' AS SELECT edge_response_result_type FROM #access_log_stage', + ]) + + +def get_delete_existing_result_types_statement() -> str: + """Returns an SQL statement that deletes existing result types from the + temporary result type table. + """ + return ''.join([ + 'DELETE FROM #result_type_stage', + f' USING {tables.RESULT_TYPE_TABLE_NAME}', + ' WHERE', + f' #result_type_stage.result_type = {tables.RESULT_TYPE_TABLE_NAME}.result_type', + ]) + + +def get_insert_result_types_statement() -> str: + """Returns an SQL statement that inserts result types in the temporary table + into the result type table. + """ + return ''.join([ + f'INSERT INTO {tables.RESULT_TYPE_TABLE_NAME} (result_type)', + ' SELECT result_type FROM #result_type_stage GROUP BY result_type', + ]) + + +def get_drop_result_type_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to aggregate + result types. + """ + return get_drop_table_statement('#result_type_stage') + + +def get_encode_foreign_keys_statement() -> str: + """Returns an SQL statement that decodes foreign keys in the temporary + access log table and creates a temporary second staging table. + """ + return ''.join([ + 'CREATE TABLE #access_log_stage_2 (', + ' datetime,', + ' edge_location,', + ' sc_bytes,', + ' cs_method,', + ' page,', + ' status,', + ' referer,', + ' user_agent,', + ' cs_protocol,', + ' cs_bytes,', + ' time_taken,', + ' edge_response_result_type,', + ' time_to_first_byte', + ')', + ' DISTKEY (referer)', + ' SORTKEY (datetime)', + ' AS SELECT', + ' #access_log_stage.datetime,', + f' {tables.EDGE_LOCATION_TABLE_NAME}.id,', + ' #access_log_stage.sc_bytes,', + ' #access_log_stage.cs_method,', + f' {tables.PAGE_TABLE_NAME}.id,', + ' #access_log_stage.status,', + f' {tables.REFERER_TABLE_NAME}.id,', + f' {tables.USER_AGENT_TABLE_NAME}.id,', + ' #access_log_stage.cs_protocol,', + ' #access_log_stage.cs_bytes,', + ' #access_log_stage.time_taken,', + f' {tables.RESULT_TYPE_TABLE_NAME}.id,', + ' #access_log_stage.time_to_first_byte', + ' FROM', + ' #access_log_stage,' + f' {tables.EDGE_LOCATION_TABLE_NAME},', + f' {tables.PAGE_TABLE_NAME},', + f' {tables.REFERER_TABLE_NAME},', + f' {tables.USER_AGENT_TABLE_NAME},', + f' {tables.RESULT_TYPE_TABLE_NAME}', + ' WHERE', + f' (#access_log_stage.edge_location = {tables.EDGE_LOCATION_TABLE_NAME}.code)', + f' AND (#access_log_stage.cs_uri_stem = {tables.PAGE_TABLE_NAME}.path)', + f' AND (#access_log_stage.referer = {tables.REFERER_TABLE_NAME}.url)', + f' AND (#access_log_stage.user_agent = {tables.USER_AGENT_TABLE_NAME}.user_agent)', + ' AND (#access_log_stage.edge_response_result_type =', + f' {tables.RESULT_TYPE_TABLE_NAME}.result_type)', + ]) + + +def get_insert_access_logs_statement() -> str: + """Returns an SQL statement that inserts access logs in the temporary second + staging table into the access log table. + """ + return ''.join([ + f'INSERT INTO {tables.ACCESS_LOG_TABLE_NAME}', + ' SELECT * FROM #access_log_stage_2', + ]) + +def get_drop_access_log_stage_2_table_statement() -> str: + """Returns an SQL statement that drops the temporary second staging table + for access logs. + """ + return get_drop_table_statement('#access_log_stage_2') + + +def get_drop_access_log_stage_table_statement() -> str: + """Returns an SQL statement that drops the temporary table to select and + format access log columns. + """ + return get_drop_table_statement('#access_log_stage') + + +def get_drop_table_statement(table_name: str) -> str: + """Returns an SQL statement that drops a given table. + """ + return f'DROP TABLE IF EXISTS {table_name}' + + +def parse_time(time_str: str) -> datetime.datetime: + """Parses a given "time" string. + """ + return datetime.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S%z') + + +def lambda_handler(event, _): + """Loads CloudFront access logs onto the data warehouse. + + This function is indented to be invoked by Amazon EventBridge. + So ``event`` must be an object with ``time`` field. + + .. code-block:: python + + { + 'time': '2020-04-28T07:20:20Z' + } + + Loads CloudFront access logs on the day before the date specified to + ``time``. + """ + LOGGER.debug('loading access logs: %s', str(event)) + invocation_date = parse_time(event['time']) + target_date = invocation_date - datetime.timedelta(days=1) + LOGGER.debug('loading access logs on:%s', str(target_date)) + res = redshift.get_credentials( + workgroupName=REDSHIFT_WORKGROUP_NAME, + dbName=ACCESS_LOGS_DATABASE_NAME, + ) + LOGGER.debug('accessing database as %s', res['dbUser']) + execute_load_script(target_date) + return {} diff --git a/cdk-ops/lib/access-logs-etl.ts b/cdk-ops/lib/access-logs-etl.ts index 21a7b77..58bb537 100644 --- a/cdk-ops/lib/access-logs-etl.ts +++ b/cdk-ops/lib/access-logs-etl.ts @@ -4,6 +4,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; import { Duration, RemovalPolicy, + aws_iam as iam, aws_lambda as lambda, aws_lambda_event_sources as lambda_event, aws_s3 as s3, @@ -14,9 +15,19 @@ import { Construct } from 'constructs'; import type { DeploymentStage } from 'cdk-common'; +import { DataWarehouse } from './data-warehouse'; +import { LatestBoto3Layer } from './latest-boto3-layer'; +import { LibdatawarehouseLayer } from './libdatawarehouse-layer'; + export interface Props { /** S3 bucket that stores CloudFront access logs. */ accessLogsBucket: s3.IBucket; + /** Data warehouse. */ + dataWarehouse: DataWarehouse; + /** Lambda layer containing the latest boto3. */ + latestBoto3: LatestBoto3Layer; + /** Lambda layer of the data warehouse library. */ + libdatawarehouse: LibdatawarehouseLayer; /** Deployment stage. */ deploymentStage: DeploymentStage; } @@ -30,15 +41,20 @@ export interface Props { */ export class AccessLogsETL extends Construct { /** S3 bucket for masked access logs. */ - readonly maskedAccessLogsBucket: s3.IBucket; + readonly outputAccessLogsBucket: s3.IBucket; constructor(scope: Construct, id: string, props: Props) { super(scope, id); - const { accessLogsBucket } = props; + const { + accessLogsBucket, + dataWarehouse, + latestBoto3, + libdatawarehouse, + } = props; - // S3 bucket for masked access logs. - this.maskedAccessLogsBucket = new s3.Bucket( + // S3 bucket for processed access logs. + this.outputAccessLogsBucket = new s3.Bucket( this, 'MaskedAccessLogsBucket', { @@ -55,6 +71,9 @@ export class AccessLogsETL extends Construct { removalPolicy: RemovalPolicy.RETAIN, }, ); + // allows the data warehouse to read the bucket + // so that it can COPY objects from the bucket. + this.outputAccessLogsBucket.grantRead(dataWarehouse.namespaceRole); // masks newly created CloudFront access logs // - Lambda function @@ -71,14 +90,14 @@ export class AccessLogsETL extends Construct { handler: 'lambda_handler', environment: { SOURCE_BUCKET_NAME: accessLogsBucket.bucketName, - DESTINATION_BUCKET_NAME: this.maskedAccessLogsBucket.bucketName, + DESTINATION_BUCKET_NAME: this.outputAccessLogsBucket.bucketName, DESTINATION_KEY_PREFIX: maskedAccessLogsKeyPrefix, }, timeout: maskAccessLogsLambdaTimeout, }, ); accessLogsBucket.grantRead(maskAccessLogsLambda); - this.maskedAccessLogsBucket.grantPut(maskAccessLogsLambda); + this.outputAccessLogsBucket.grantPut(maskAccessLogsLambda); // - SQS queue to capture creation of access logs files, which triggers // the above Lambda function const maxBatchingWindow = Duration.minutes(5); // least frequency @@ -128,7 +147,7 @@ export class AccessLogsETL extends Construct { environment: { SOURCE_BUCKET_NAME: accessLogsBucket.bucketName, // bucket name for masked logs is necessary to verify input events. - DESTINATION_BUCKET_NAME: this.maskedAccessLogsBucket.bucketName, + DESTINATION_BUCKET_NAME: this.outputAccessLogsBucket.bucketName, DESTINATION_KEY_PREFIX: maskedAccessLogsKeyPrefix, }, timeout: deleteAccessLogsLambdaTimeout, @@ -143,7 +162,7 @@ export class AccessLogsETL extends Construct { Duration.seconds(6 * deleteAccessLogsLambdaTimeout.toSeconds()), ), }); - this.maskedAccessLogsBucket.addEventNotification( + this.outputAccessLogsBucket.addEventNotification( s3.EventType.OBJECT_CREATED, new s3n.SqsDestination(maskedLogsQueue), { prefix: maskedAccessLogsKeyPrefix }, @@ -155,5 +174,32 @@ export class AccessLogsETL extends Construct { maxBatchingWindow, }), ); + + // loads processed logs onto the data warehouse once a day. + const loadAccessLogsLambda = new PythonFunction( + this, + 'LoadAccessLogsLambda', + { + description: + 'Loads processed CloudFront access logs onto the data warehouse', + runtime: lambda.Runtime.PYTHON_3_8, + architecture: lambda.Architecture.ARM_64, + entry: path.join('lambda', 'load-access-logs'), + index: 'index.py', + handler: 'lambda_handler', + layers: [latestBoto3.layer, libdatawarehouse.layer], + environment: { + SOURCE_BUCKET_NAME: this.outputAccessLogsBucket.bucketName, + SOURCE_KEY_PREFIX: maskedAccessLogsKeyPrefix, + REDSHIFT_WORKGROUP_NAME: dataWarehouse.workgroupName, + COPY_ROLE_ARN: dataWarehouse.namespaceRole.roleArn, + }, + timeout: Duration.minutes(15), + }, + ); + dataWarehouse.grantQuery(loadAccessLogsLambda); + // loadAccessLogsLambda does not need permissions to read + // outputAccessLogsBucket + // TODO: schedule running loadAccessLogsLambda } } diff --git a/cdk-ops/lib/cdk-ops-stack.ts b/cdk-ops/lib/cdk-ops-stack.ts index c8ee5e4..4f57522 100644 --- a/cdk-ops/lib/cdk-ops-stack.ts +++ b/cdk-ops/lib/cdk-ops-stack.ts @@ -42,6 +42,9 @@ export class CdkOpsStack extends Stack { { accessLogsBucket: codemongerResources.developmentContentsAccessLogsBucket, + dataWarehouse, + latestBoto3, + libdatawarehouse, deploymentStage: 'development', }, ); diff --git a/cdk-ops/lib/data-warehouse.ts b/cdk-ops/lib/data-warehouse.ts index 5b4cc95..5492d61 100644 --- a/cdk-ops/lib/data-warehouse.ts +++ b/cdk-ops/lib/data-warehouse.ts @@ -1,7 +1,9 @@ import * as path from 'path'; import { + Arn, Duration, + Stack, aws_ec2 as ec2, aws_iam as iam, aws_lambda as lambda, @@ -35,10 +37,15 @@ export interface Props { export class DataWarehouse extends Construct { /** VPC for Redshift Serverless clusters. */ readonly vpc: ec2.IVpc; + // TODO: unnecessary exposure of `adminSecret` /** Secret for the admin user. */ readonly adminSecret: secrets.ISecret; - /** IAM role of Redshift Serverless namespace. */ + /** Default IAM role associated with the Redshift Serverless namespace. */ readonly namespaceRole: iam.IRole; + /** Name of the Redshift Serverless workgroup. */ + readonly workgroupName: string; + /** Redshift Serverless workgroup. */ + readonly workgroup: redshift.CfnWorkgroup; constructor(scope: Construct, id: string, props: Props) { super(scope, id); @@ -109,8 +116,9 @@ export class DataWarehouse extends Construct { this.adminSecret.node.defaultChild as secrets.CfnSecret, ); // - workgroup - const workgroup = new redshift.CfnWorkgroup(this, 'DwWorkgroup', { - workgroupName: `datawarehouse-${deploymentStage}`, + this.workgroupName = `datawarehouse-${deploymentStage}`; + this.workgroup = new redshift.CfnWorkgroup(this, 'DwWorkgroup', { + workgroupName: this.workgroupName, namespaceName: dwNamespace.namespaceName, baseCapacity: 32, subnetIds: this.getSubnetIdsForCluster(), @@ -125,7 +133,7 @@ export class DataWarehouse extends Construct { }, ], }); - workgroup.addDependsOn(dwNamespace); + this.workgroup.addDependsOn(dwNamespace); // Lambda function that populates the database and tables. const populateDwDatabaseLambda = new PythonFunction( @@ -140,7 +148,7 @@ export class DataWarehouse extends Construct { handler: 'lambda_handler', layers: [latestBoto3.layer, libdatawarehouse.layer], environment: { - WORKGROUP_NAME: workgroup.workgroupName, + WORKGROUP_NAME: this.workgroupName, ADMIN_SECRET_ARN: this.adminSecret.secretArn, ADMIN_DATABASE_NAME: 'dev', }, @@ -148,7 +156,7 @@ export class DataWarehouse extends Construct { // a Lambda function does not have to join the VPC // as long as it uses Redshift Data API. // - // if want to directly connect to the Redshift cluster from a Lambda, + // if we want to directly connect to the Redshift cluster from a Lambda, // we have to put the Lambda in the VPC and allocate a VPC endpoint. // but I cannot afford VPC endpoints for now. // @@ -168,4 +176,34 @@ export class DataWarehouse extends Construct { subnetGroupName: CLUSTER_SUBNET_GROUP_NAME, }).subnetIds; } + + /** + * Grants permissions to query this data warehouse via the Redshift Data API. + * + * Allows `grantee` to call `redshift-serverless:GetCredentials`. + */ + grantQuery(grantee: iam.IGrantable): iam.Grant { + iam.Grant + .addToPrincipal({ + grantee, + actions: ['redshift-serverless:GetCredentials'], + resourceArns: [ + // TODO: how can we get the ARN of the workgroup? + Arn.format( + { + service: 'redshift-serverless', + resource: 'workgroup', + resourceName: '*', + }, + Stack.of(this.workgroup), + ), + ], + }) + .assertSuccess(); + return iam.Grant.addToPrincipal({ + grantee, + actions: ['redshift-data:*'], + resourceArns: ['*'], + }); + } }