Skip to content

Commit

Permalink
feat(cdk-ops): use libdatawarehouse
Browse files Browse the repository at this point in the history
- `lambda/populate-dw-database` uses `libdatawarehouse` to replace
  commonly used types and functions.
- `CdkOpsStack` provisions `LibdatawarehouseLayer` and passes it to
  `DataWarehouse`.

issue codemonger-io#30
  • Loading branch information
kikuomax committed Oct 8, 2022
1 parent 570d151 commit 29b0b25
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 86 deletions.
123 changes: 46 additions & 77 deletions cdk-ops/lambda/populate-dw-database/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,62 +5,29 @@
You have to configure the following environment variables,
- ``WORKGROUP_NAME``: name of the Redshift Serverless workgroup to connect to
- ``ADMIN_SECRET_ARN``: ARN of the admin secret
- ``ADMIN_DATABASE_NAME``: name of the admin database
"""

import logging
import os
import time
from typing import Dict, Optional, Sequence, Tuple
from typing import Sequence
import boto3
from libdatawarehouse import ACCESS_LOGS_DATABASE_NAME, data_api, tables
from libdatawarehouse.exceptions import DataWarehouseException


WORKGROUP_NAME = os.environ['WORKGROUP_NAME']
ADMIN_SECRET_ARN = os.environ['ADMIN_SECRET_ARN']
ADMIN_DATABASE_NAME = os.environ['ADMIN_DATABASE_NAME']
ACCESS_LOGS_DATABASE_NAME = os.environ['ACCESS_LOGS_DATABASE_NAME']
REFERER_TABLE_NAME = os.environ['REFERER_TABLE_NAME']
PAGE_TABLE_NAME = os.environ['PAGE_TABLE_NAME']
EDGE_LOCATION_TABLE_NAME = os.environ['EDGE_LOCATION_TABLE_NAME']
USER_AGENT_TABLE_NAME = os.environ['USER_AGENT_TABLE_NAME']
RESULT_TYPE_TABLE_NAME = os.environ['RESULT_TYPE_TABLE_NAME']
ACCESS_LOG_TABLE_NAME = os.environ['ACCESS_LOG_TABLE_NAME']

POLLING_INTERVAL_IN_S = 0.05
MAX_POLLING_COUNTER = round(60 / POLLING_INTERVAL_IN_S) # > 1 minute

RUNNING_STATUSES = ['SUBMITTED', 'PICKED', 'STARTED']

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

redshift_data = boto3.client('redshift-data')


class DataWarehouseException(Exception):
"""Exception raised when a data warehouse operation fails.
"""

message: str


def __init__(self, message: str):
"""Initializes with a given message.
"""
self.message = message


def __str__(self):
classname = type(self).__name__
return f'{classname}({self.message})'


def __repr__(self):
classname = type(self).__name__
return f'{classname}({repr(self.message)})'


def get_create_database_statement() -> str:
"""Returns an SQL statement to create the database.
"""Returns an SQL statement to create the database for access logs.
"""
return f'CREATE DATABASE {ACCESS_LOGS_DATABASE_NAME}'

Expand All @@ -70,21 +37,29 @@ def get_create_tables_script() -> Sequence[str]:
"""
return [
get_create_referer_table_statement(),
get_grant_public_table_access_statement(tables.REFERER_TABLE_NAME),
get_create_page_table_statement(),
get_grant_public_table_access_statement(tables.PAGE_TABLE_NAME),
get_create_edge_location_table_statement(),
get_grant_public_table_access_statement(
tables.EDGE_LOCATION_TABLE_NAME,
),
get_create_user_agent_table_statement(),
get_grant_public_table_access_statement(tables.USER_AGENT_TABLE_NAME),
get_create_result_type_table_statement(),
get_grant_public_table_access_statement(tables.RESULT_TYPE_TABLE_NAME),
get_create_access_log_table_statement(),
get_grant_public_table_access_statement(tables.ACCESS_LOG_TABLE_NAME),
]


def get_create_referer_table_statement() -> str:
"""Returns an SQL statement to create the table for referers.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {REFERER_TABLE_NAME} (',
f'CREATE TABLE IF NOT EXISTS {tables.REFERER_TABLE_NAME} (',
' id BIGINT IDENTITY(1, 1) DISTKEY,',
' url VARCHAR NOT NULL,',
' url VARCHAR(2048) NOT NULL SORTKEY UNIQUE,',
' PRIMARY KEY (id)',
')',
])
Expand All @@ -94,9 +69,9 @@ def get_create_page_table_statement() -> str:
"""Returns an SQL statement to create the table for pages.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {PAGE_TABLE_NAME} (',
f'CREATE TABLE IF NOT EXISTS {tables.PAGE_TABLE_NAME} (',
' id INT IDENTITY(1, 1),',
' path VARCHAR NOT NULL,'
' path VARCHAR(2048) NOT NULL SORTKEY UNIQUE,'
' PRIMARY KEY (id)',
')',
])
Expand All @@ -106,9 +81,9 @@ def get_create_edge_location_table_statement() -> str:
"""Returns an SQL statement to create the table for edge locations.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {EDGE_LOCATION_TABLE_NAME} (',
f'CREATE TABLE IF NOT EXISTS {tables.EDGE_LOCATION_TABLE_NAME} (',
' id INT IDENTITY(1, 1),',
' code VARCHAR NOT NULL,',
' code VARCHAR NOT NULL SORTKEY UNIQUE,',
' PRIMARY KEY (id)',
')'
])
Expand All @@ -118,9 +93,9 @@ def get_create_user_agent_table_statement() -> str:
"""Returns an SQL statement to create the table for user agents.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {USER_AGENT_TABLE_NAME} ('
f'CREATE TABLE IF NOT EXISTS {tables.USER_AGENT_TABLE_NAME} ('
' id BIGINT IDENTITY(1, 1),',
' user_agent VARCHAR NOT NULL,',
' user_agent VARCHAR(2048) NOT NULL SORTKEY UNIQUE,',
' PRIMARY KEY (id)',
')',
])
Expand All @@ -130,9 +105,9 @@ def get_create_result_type_table_statement() -> str:
"""Returns an SQL statement to create the table for result types.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {RESULT_TYPE_TABLE_NAME} ('
f'CREATE TABLE IF NOT EXISTS {tables.RESULT_TYPE_TABLE_NAME} ('
' id INT IDENTITY(1, 1),',
' result_type VARCHAR NOT NULL,',
' result_type VARCHAR NOT NULL SORTKEY UNIQUE,',
' PRIMARY KEY (id)',
')',
])
Expand All @@ -142,7 +117,7 @@ def get_create_access_log_table_statement() -> str:
"""Returns an SQL statement to create the table for access logs.
"""
return ''.join([
f'CREATE TABLE IF NOT EXISTS {ACCESS_LOG_TABLE_NAME} (',
f'CREATE TABLE IF NOT EXISTS {tables.ACCESS_LOG_TABLE_NAME} (',
' datetime TIMESTAMP SORTKEY NOT NULL,',
' edge_location INT NOT NULL,',
' sc_bytes BIGINT NOT NULL,',
Expand All @@ -156,35 +131,19 @@ def get_create_access_log_table_statement() -> str:
' time_taken FLOAT4 NOT NULL,',
' edge_response_result_type INT NOT NULL,',
' time_to_first_byte FLOAT4 NOT NULL,',
f' FOREIGN KEY (edge_location) REFERENCES {EDGE_LOCATION_TABLE_NAME},'
f' FOREIGN KEY (page) REFERENCES {PAGE_TABLE_NAME},'
f' FOREIGN KEY (referer) REFERENCES {REFERER_TABLE_NAME},'
f' FOREIGN KEY (user_agent) REFERENCES {USER_AGENT_TABLE_NAME},'
f' FOREIGN KEY (edge_response_result_type) REFERENCES {RESULT_TYPE_TABLE_NAME}'
f' FOREIGN KEY (edge_location) REFERENCES {tables.EDGE_LOCATION_TABLE_NAME},'
f' FOREIGN KEY (page) REFERENCES {tables.PAGE_TABLE_NAME},'
f' FOREIGN KEY (referer) REFERENCES {tables.REFERER_TABLE_NAME},'
f' FOREIGN KEY (user_agent) REFERENCES {tables.USER_AGENT_TABLE_NAME},'
f' FOREIGN KEY (edge_response_result_type) REFERENCES {tables.RESULT_TYPE_TABLE_NAME}'
')',
])


def wait_for_statement(statement_id: str) -> Tuple[Optional[str], Dict]:
"""Waits for a given statement to finish.
:returns: final status of the statement.
``None`` if polling has timed out.
def get_grant_public_table_access_statement(table_name: str) -> str:
"""Returns an SQL statement to grant access on a given table to public.
"""
counter = 0
while counter < MAX_POLLING_COUNTER:
res = redshift_data.describe_statement(Id=statement_id)
if counter % 20 == 0:
LOGGER.debug('polling statement status [%d]: %s', counter, str(res))
if res['Status'] not in RUNNING_STATUSES:
LOGGER.debug(
'statement done in: %.3f ms',
res.get('Duration', 0) * 0.001 * 0.001, # ns → ms
)
return res['Status'], res
time.sleep(POLLING_INTERVAL_IN_S)
counter += 1
return None, res
return f'GRANT SELECT,INSERT,UPDATE,DELETE ON {table_name} TO PUBLIC'


def lambda_handler(event, _):
Expand All @@ -201,26 +160,32 @@ def lambda_handler(event, _):
Database=ADMIN_DATABASE_NAME,
Sql=get_create_database_statement(),
)
status, res = wait_for_statement(res['Id'])
status, res = data_api.wait_for_results(redshift_data, res['Id'])
if status != 'FINISHED':
if status == 'FAILED':
# ignores the error if the database already exists
if not res.get('Error', '').lower().endswith('already exists'):
# just warns if the database already exists
if res.get('Error', '').lower().endswith('already exists'):
LOGGER.warning('database already exists')
else:
raise DataWarehouseException(
f'failed to create the database: {res.get("Error")}',
)
else:
raise DataWarehouseException(
f'failed to create the database: {status or "timeout"}',
)
LOGGER.debug(
'populated database in %.3f ms',
res.get('Duration', 0) * 0.001 * 0.001, # ns → ms
)
# populates the tables
res = redshift_data.batch_execute_statement(
WorkgroupName=WORKGROUP_NAME,
SecretArn=ADMIN_SECRET_ARN,
Database=ACCESS_LOGS_DATABASE_NAME,
Sqls=get_create_tables_script(),
)
status, res = wait_for_statement(res['Id'])
status, res = data_api.wait_for_results(redshift_data, res['Id'])
if status != 'FINISHED':
if status == 'FAILED':
raise DataWarehouseException(
Expand All @@ -229,6 +194,10 @@ def lambda_handler(event, _):
raise DataWarehouseException(
f'failed to populate tables: {status or "timeout"}',
)
LOGGER.debug(
'populated tables in %.3f ms',
res.get('Duration') * 0.001 * 0.001, # ns → ms
)
return {
'statusCode': 200,
}
4 changes: 4 additions & 0 deletions cdk-ops/lib/cdk-ops-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
import { ContentsPipeline } from './contents-pipeline';
import { DataWarehouse } from './data-warehouse';
import { LatestBoto3Layer } from './latest-boto3-layer';
import { LibdatawarehouseLayer } from './libdatawarehouse-layer';

type Props = StackProps & Readonly<{
// names of the main codemonger resources.
Expand All @@ -25,11 +26,14 @@ export class CdkOpsStack extends Stack {
props.codemongerResourceNames,
);
const latestBoto3 = new LatestBoto3Layer(this, 'LatestBoto3');
const libdatawarehouse =
new LibdatawarehouseLayer(this, 'Libdatawarehouse');
const pipeline = new ContentsPipeline(this, 'ContentsPipeline', {
codemongerResources,
});
const dataWarehouse = new DataWarehouse(this, 'DevelopmentDataWarehouse', {
latestBoto3,
libdatawarehouse,
deploymentStage: 'development',
});
const developmentContentsAccessLogsETL = new AccessLogsETL(
Expand Down
14 changes: 5 additions & 9 deletions cdk-ops/lib/data-warehouse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import type { DeploymentStage } from 'cdk-common';

import { LatestBoto3Layer } from './latest-boto3-layer';
import { LibdatawarehouseLayer } from './libdatawarehouse-layer';

/** Name of the admin user. */
export const ADMIN_USER_NAME = 'dwadmin';
Expand All @@ -24,6 +25,8 @@ export const CLUSTER_SUBNET_GROUP_NAME = 'dw-cluster';
export interface Props {
/** Lambda layer containing the latest boto3. */
latestBoto3: LatestBoto3Layer;
/** Lambda layer containing libdatawarehouse. */
libdatawarehouse: LibdatawarehouseLayer;
/** Deployment stage. */
deploymentStage: DeploymentStage;
}
Expand All @@ -40,7 +43,7 @@ export class DataWarehouse extends Construct {
constructor(scope: Construct, id: string, props: Props) {
super(scope, id);

const { deploymentStage, latestBoto3 } = props;
const { deploymentStage, latestBoto3, libdatawarehouse } = props;

this.vpc = new ec2.Vpc(this, `DwVpc`, {
cidr: '192.168.0.0/16',
Expand Down Expand Up @@ -135,18 +138,11 @@ export class DataWarehouse extends Construct {
entry: path.join('lambda', 'populate-dw-database'),
index: 'index.py',
handler: 'lambda_handler',
layers: [latestBoto3.layer],
layers: [latestBoto3.layer, libdatawarehouse.layer],
environment: {
WORKGROUP_NAME: workgroup.workgroupName,
ADMIN_SECRET_ARN: this.adminSecret.secretArn,
ADMIN_DATABASE_NAME: 'dev',
ACCESS_LOGS_DATABASE_NAME: 'access_logs',
PAGE_TABLE_NAME: 'page',
REFERER_TABLE_NAME: 'referer',
EDGE_LOCATION_TABLE_NAME: 'edge_location',
USER_AGENT_TABLE_NAME: 'user_agent',
RESULT_TYPE_TABLE_NAME: 'result_type',
ACCESS_LOG_TABLE_NAME: 'access_log',
},
timeout: Duration.minutes(15),
// a Lambda function does not have to join the VPC
Expand Down

0 comments on commit 29b0b25

Please sign in to comment.