diff --git a/cdk-ops/lambda/populate-dw-database/index.py b/cdk-ops/lambda/populate-dw-database/index.py index 7086980..0f5d4c7 100644 --- a/cdk-ops/lambda/populate-dw-database/index.py +++ b/cdk-ops/lambda/populate-dw-database/index.py @@ -5,30 +5,20 @@ You have to configure the following environment variables, - ``WORKGROUP_NAME``: name of the Redshift Serverless workgroup to connect to - ``ADMIN_SECRET_ARN``: ARN of the admin secret +- ``ADMIN_DATABASE_NAME``: name of the admin database """ import logging import os -import time -from typing import Dict, Optional, Sequence, Tuple +from typing import Sequence import boto3 +from libdatawarehouse import ACCESS_LOGS_DATABASE_NAME, data_api, tables +from libdatawarehouse.exceptions import DataWarehouseException WORKGROUP_NAME = os.environ['WORKGROUP_NAME'] ADMIN_SECRET_ARN = os.environ['ADMIN_SECRET_ARN'] ADMIN_DATABASE_NAME = os.environ['ADMIN_DATABASE_NAME'] -ACCESS_LOGS_DATABASE_NAME = os.environ['ACCESS_LOGS_DATABASE_NAME'] -REFERER_TABLE_NAME = os.environ['REFERER_TABLE_NAME'] -PAGE_TABLE_NAME = os.environ['PAGE_TABLE_NAME'] -EDGE_LOCATION_TABLE_NAME = os.environ['EDGE_LOCATION_TABLE_NAME'] -USER_AGENT_TABLE_NAME = os.environ['USER_AGENT_TABLE_NAME'] -RESULT_TYPE_TABLE_NAME = os.environ['RESULT_TYPE_TABLE_NAME'] -ACCESS_LOG_TABLE_NAME = os.environ['ACCESS_LOG_TABLE_NAME'] - -POLLING_INTERVAL_IN_S = 0.05 -MAX_POLLING_COUNTER = round(60 / POLLING_INTERVAL_IN_S) # > 1 minute - -RUNNING_STATUSES = ['SUBMITTED', 'PICKED', 'STARTED'] LOGGER = logging.getLogger(__name__) LOGGER.setLevel(logging.DEBUG) @@ -36,31 +26,8 @@ redshift_data = boto3.client('redshift-data') -class DataWarehouseException(Exception): - """Exception raised when a data warehouse operation fails. - """ - - message: str - - - def __init__(self, message: str): - """Initializes with a given message. - """ - self.message = message - - - def __str__(self): - classname = type(self).__name__ - return f'{classname}({self.message})' - - - def __repr__(self): - classname = type(self).__name__ - return f'{classname}({repr(self.message)})' - - def get_create_database_statement() -> str: - """Returns an SQL statement to create the database. + """Returns an SQL statement to create the database for access logs. """ return f'CREATE DATABASE {ACCESS_LOGS_DATABASE_NAME}' @@ -70,11 +37,19 @@ def get_create_tables_script() -> Sequence[str]: """ return [ get_create_referer_table_statement(), + get_grant_public_table_access_statement(tables.REFERER_TABLE_NAME), get_create_page_table_statement(), + get_grant_public_table_access_statement(tables.PAGE_TABLE_NAME), get_create_edge_location_table_statement(), + get_grant_public_table_access_statement( + tables.EDGE_LOCATION_TABLE_NAME, + ), get_create_user_agent_table_statement(), + get_grant_public_table_access_statement(tables.USER_AGENT_TABLE_NAME), get_create_result_type_table_statement(), + get_grant_public_table_access_statement(tables.RESULT_TYPE_TABLE_NAME), get_create_access_log_table_statement(), + get_grant_public_table_access_statement(tables.ACCESS_LOG_TABLE_NAME), ] @@ -82,9 +57,9 @@ def get_create_referer_table_statement() -> str: """Returns an SQL statement to create the table for referers. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {REFERER_TABLE_NAME} (', + f'CREATE TABLE IF NOT EXISTS {tables.REFERER_TABLE_NAME} (', ' id BIGINT IDENTITY(1, 1) DISTKEY,', - ' url VARCHAR NOT NULL,', + ' url VARCHAR(2048) NOT NULL SORTKEY UNIQUE,', ' PRIMARY KEY (id)', ')', ]) @@ -94,9 +69,9 @@ def get_create_page_table_statement() -> str: """Returns an SQL statement to create the table for pages. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {PAGE_TABLE_NAME} (', + f'CREATE TABLE IF NOT EXISTS {tables.PAGE_TABLE_NAME} (', ' id INT IDENTITY(1, 1),', - ' path VARCHAR NOT NULL,' + ' path VARCHAR(2048) NOT NULL SORTKEY UNIQUE,' ' PRIMARY KEY (id)', ')', ]) @@ -106,9 +81,9 @@ def get_create_edge_location_table_statement() -> str: """Returns an SQL statement to create the table for edge locations. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {EDGE_LOCATION_TABLE_NAME} (', + f'CREATE TABLE IF NOT EXISTS {tables.EDGE_LOCATION_TABLE_NAME} (', ' id INT IDENTITY(1, 1),', - ' code VARCHAR NOT NULL,', + ' code VARCHAR NOT NULL SORTKEY UNIQUE,', ' PRIMARY KEY (id)', ')' ]) @@ -118,9 +93,9 @@ def get_create_user_agent_table_statement() -> str: """Returns an SQL statement to create the table for user agents. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {USER_AGENT_TABLE_NAME} (' + f'CREATE TABLE IF NOT EXISTS {tables.USER_AGENT_TABLE_NAME} (' ' id BIGINT IDENTITY(1, 1),', - ' user_agent VARCHAR NOT NULL,', + ' user_agent VARCHAR(2048) NOT NULL SORTKEY UNIQUE,', ' PRIMARY KEY (id)', ')', ]) @@ -130,9 +105,9 @@ def get_create_result_type_table_statement() -> str: """Returns an SQL statement to create the table for result types. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {RESULT_TYPE_TABLE_NAME} (' + f'CREATE TABLE IF NOT EXISTS {tables.RESULT_TYPE_TABLE_NAME} (' ' id INT IDENTITY(1, 1),', - ' result_type VARCHAR NOT NULL,', + ' result_type VARCHAR NOT NULL SORTKEY UNIQUE,', ' PRIMARY KEY (id)', ')', ]) @@ -142,7 +117,7 @@ def get_create_access_log_table_statement() -> str: """Returns an SQL statement to create the table for access logs. """ return ''.join([ - f'CREATE TABLE IF NOT EXISTS {ACCESS_LOG_TABLE_NAME} (', + f'CREATE TABLE IF NOT EXISTS {tables.ACCESS_LOG_TABLE_NAME} (', ' datetime TIMESTAMP SORTKEY NOT NULL,', ' edge_location INT NOT NULL,', ' sc_bytes BIGINT NOT NULL,', @@ -156,35 +131,19 @@ def get_create_access_log_table_statement() -> str: ' time_taken FLOAT4 NOT NULL,', ' edge_response_result_type INT NOT NULL,', ' time_to_first_byte FLOAT4 NOT NULL,', - f' FOREIGN KEY (edge_location) REFERENCES {EDGE_LOCATION_TABLE_NAME},' - f' FOREIGN KEY (page) REFERENCES {PAGE_TABLE_NAME},' - f' FOREIGN KEY (referer) REFERENCES {REFERER_TABLE_NAME},' - f' FOREIGN KEY (user_agent) REFERENCES {USER_AGENT_TABLE_NAME},' - f' FOREIGN KEY (edge_response_result_type) REFERENCES {RESULT_TYPE_TABLE_NAME}' + f' FOREIGN KEY (edge_location) REFERENCES {tables.EDGE_LOCATION_TABLE_NAME},' + f' FOREIGN KEY (page) REFERENCES {tables.PAGE_TABLE_NAME},' + f' FOREIGN KEY (referer) REFERENCES {tables.REFERER_TABLE_NAME},' + f' FOREIGN KEY (user_agent) REFERENCES {tables.USER_AGENT_TABLE_NAME},' + f' FOREIGN KEY (edge_response_result_type) REFERENCES {tables.RESULT_TYPE_TABLE_NAME}' ')', ]) -def wait_for_statement(statement_id: str) -> Tuple[Optional[str], Dict]: - """Waits for a given statement to finish. - - :returns: final status of the statement. - ``None`` if polling has timed out. +def get_grant_public_table_access_statement(table_name: str) -> str: + """Returns an SQL statement to grant access on a given table to public. """ - counter = 0 - while counter < MAX_POLLING_COUNTER: - res = redshift_data.describe_statement(Id=statement_id) - if counter % 20 == 0: - LOGGER.debug('polling statement status [%d]: %s', counter, str(res)) - if res['Status'] not in RUNNING_STATUSES: - LOGGER.debug( - 'statement done in: %.3f ms', - res.get('Duration', 0) * 0.001 * 0.001, # ns → ms - ) - return res['Status'], res - time.sleep(POLLING_INTERVAL_IN_S) - counter += 1 - return None, res + return f'GRANT SELECT,INSERT,UPDATE,DELETE ON {table_name} TO PUBLIC' def lambda_handler(event, _): @@ -201,11 +160,13 @@ def lambda_handler(event, _): Database=ADMIN_DATABASE_NAME, Sql=get_create_database_statement(), ) - status, res = wait_for_statement(res['Id']) + status, res = data_api.wait_for_results(redshift_data, res['Id']) if status != 'FINISHED': if status == 'FAILED': - # ignores the error if the database already exists - if not res.get('Error', '').lower().endswith('already exists'): + # just warns if the database already exists + if res.get('Error', '').lower().endswith('already exists'): + LOGGER.warning('database already exists') + else: raise DataWarehouseException( f'failed to create the database: {res.get("Error")}', ) @@ -213,6 +174,10 @@ def lambda_handler(event, _): raise DataWarehouseException( f'failed to create the database: {status or "timeout"}', ) + LOGGER.debug( + 'populated database in %.3f ms', + res.get('Duration', 0) * 0.001 * 0.001, # ns → ms + ) # populates the tables res = redshift_data.batch_execute_statement( WorkgroupName=WORKGROUP_NAME, @@ -220,7 +185,7 @@ def lambda_handler(event, _): Database=ACCESS_LOGS_DATABASE_NAME, Sqls=get_create_tables_script(), ) - status, res = wait_for_statement(res['Id']) + status, res = data_api.wait_for_results(redshift_data, res['Id']) if status != 'FINISHED': if status == 'FAILED': raise DataWarehouseException( @@ -229,6 +194,10 @@ def lambda_handler(event, _): raise DataWarehouseException( f'failed to populate tables: {status or "timeout"}', ) + LOGGER.debug( + 'populated tables in %.3f ms', + res.get('Duration') * 0.001 * 0.001, # ns → ms + ) return { 'statusCode': 200, } diff --git a/cdk-ops/lib/cdk-ops-stack.ts b/cdk-ops/lib/cdk-ops-stack.ts index ce2ee7b..c8ee5e4 100644 --- a/cdk-ops/lib/cdk-ops-stack.ts +++ b/cdk-ops/lib/cdk-ops-stack.ts @@ -9,6 +9,7 @@ import { import { ContentsPipeline } from './contents-pipeline'; import { DataWarehouse } from './data-warehouse'; import { LatestBoto3Layer } from './latest-boto3-layer'; +import { LibdatawarehouseLayer } from './libdatawarehouse-layer'; type Props = StackProps & Readonly<{ // names of the main codemonger resources. @@ -25,11 +26,14 @@ export class CdkOpsStack extends Stack { props.codemongerResourceNames, ); const latestBoto3 = new LatestBoto3Layer(this, 'LatestBoto3'); + const libdatawarehouse = + new LibdatawarehouseLayer(this, 'Libdatawarehouse'); const pipeline = new ContentsPipeline(this, 'ContentsPipeline', { codemongerResources, }); const dataWarehouse = new DataWarehouse(this, 'DevelopmentDataWarehouse', { latestBoto3, + libdatawarehouse, deploymentStage: 'development', }); const developmentContentsAccessLogsETL = new AccessLogsETL( diff --git a/cdk-ops/lib/data-warehouse.ts b/cdk-ops/lib/data-warehouse.ts index ab00882..5b4cc95 100644 --- a/cdk-ops/lib/data-warehouse.ts +++ b/cdk-ops/lib/data-warehouse.ts @@ -14,6 +14,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; import type { DeploymentStage } from 'cdk-common'; import { LatestBoto3Layer } from './latest-boto3-layer'; +import { LibdatawarehouseLayer } from './libdatawarehouse-layer'; /** Name of the admin user. */ export const ADMIN_USER_NAME = 'dwadmin'; @@ -24,6 +25,8 @@ export const CLUSTER_SUBNET_GROUP_NAME = 'dw-cluster'; export interface Props { /** Lambda layer containing the latest boto3. */ latestBoto3: LatestBoto3Layer; + /** Lambda layer containing libdatawarehouse. */ + libdatawarehouse: LibdatawarehouseLayer; /** Deployment stage. */ deploymentStage: DeploymentStage; } @@ -40,7 +43,7 @@ export class DataWarehouse extends Construct { constructor(scope: Construct, id: string, props: Props) { super(scope, id); - const { deploymentStage, latestBoto3 } = props; + const { deploymentStage, latestBoto3, libdatawarehouse } = props; this.vpc = new ec2.Vpc(this, `DwVpc`, { cidr: '192.168.0.0/16', @@ -135,18 +138,11 @@ export class DataWarehouse extends Construct { entry: path.join('lambda', 'populate-dw-database'), index: 'index.py', handler: 'lambda_handler', - layers: [latestBoto3.layer], + layers: [latestBoto3.layer, libdatawarehouse.layer], environment: { WORKGROUP_NAME: workgroup.workgroupName, ADMIN_SECRET_ARN: this.adminSecret.secretArn, ADMIN_DATABASE_NAME: 'dev', - ACCESS_LOGS_DATABASE_NAME: 'access_logs', - PAGE_TABLE_NAME: 'page', - REFERER_TABLE_NAME: 'referer', - EDGE_LOCATION_TABLE_NAME: 'edge_location', - USER_AGENT_TABLE_NAME: 'user_agent', - RESULT_TYPE_TABLE_NAME: 'result_type', - ACCESS_LOG_TABLE_NAME: 'access_log', }, timeout: Duration.minutes(15), // a Lambda function does not have to join the VPC