From 4cae4ae91bfee4109fc1f6f75e429bbcfaae17b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 30 Oct 2023 21:00:52 +0100 Subject: [PATCH] fix: Address typing issue --- .../test_redshift_batch_export_workflow.py | 26 +++++++++++---- .../workflows/redshift_batch_export.py | 32 +++++++++++++------ 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py index 8414d79317689d..90ddf571572e9d 100644 --- a/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py @@ -1,13 +1,13 @@ -from uuid import uuid4 -from random import randint -import json import datetime as dt +import json import os +from random import randint +from uuid import uuid4 import psycopg2 -from psycopg2 import sql import pytest from django.conf import settings +from psycopg2 import sql from posthog.temporal.tests.batch_exports.base import ( EventValues, @@ -33,7 +33,7 @@ def assert_events_in_redshift(connection, schema, table_name, events): - """Assert provided events written to a given Postgres table.""" + """Assert provided events written to a given Redshift table.""" inserted_events = [] @@ -78,7 +78,10 @@ def assert_events_in_redshift(connection, schema, table_name, events): @pytest.fixture def redshift_config(): - """Fixture to provide a default configuration for Redshift batch exports.""" + """Fixture to provide a default configuration for Redshift batch exports. + + Reads required env vars to construct configuration. + """ user = os.environ["REDSHIFT_USER"] password = os.environ["REDSHIFT_PASSWORD"] host = os.environ["REDSHIFT_HOST"] @@ -96,7 +99,16 @@ def redshift_config(): @pytest.fixture def setup_test_db(redshift_config): - """Fixture to manage a database for Redshift exports.""" + """Fixture to manage a database for Redshift export testing. + + Managing a test database involves the following steps: + 1. Creating a test database. + 2. Initializing a connection to that database. + 3. Creating a test schema. + 4. Yielding the connection to be used in tests. + 5. After tests, drop the test schema and any tables in it. + 6. Drop the test database. + """ connection = psycopg2.connect( user=redshift_config["user"], password=redshift_config["password"], diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index bd6537930a5242..36721fd9858223 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -1,3 +1,4 @@ +import collections.abc import datetime as dt import json import typing @@ -31,7 +32,7 @@ def insert_records_to_redshift( - records: list[dict[str, typing.Any]], + records: collections.abc.Iterator[dict[str, typing.Any]], redshift_connection: psycopg2.extensions.connection, schema: str, table: str, @@ -53,7 +54,8 @@ def insert_records_to_redshift( schema: The schema that contains the table where to insert the record. table: The name of the table where to insert the record. batch_size: Number of records to insert in batch. Setting this too high could - make us go OOM or exceed Redshift's SQL statement size limit (16MB). + make us go OOM or exceed Redshift's SQL statement size limit (16MB). Setting this too low + can significantly affect performance due to Redshift's poor handling of INSERTs. """ batch = [next(records)] @@ -93,7 +95,21 @@ class RedshiftInsertInputs(PostgresInsertInputs): @activity.defn async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): - """Activity streams data from ClickHouse to Redshift.""" + """Activity to insert data from ClickHouse to Redshift. + + This activity executes the following steps: + 1. Check if anything is to be exported. + 2. Create destination table if not present. + 3. Query rows to export. + 4. Insert rows into Redshift. + + Args: + inputs: The dataclass holding inputs for this activity. The inputs + include: connection configuration (e.g. host, user, port), batch export + query parameters (e.g. team_id, data_interval_start, include_events), and + the Redshift-specific properties_data_type to indicate the type of JSON-like + fields. + """ logger = get_batch_exports_logger(inputs=inputs) logger.info( "Running Postgres export batch %s - %s", @@ -169,9 +185,10 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ] json_columns = ("properties", "set", "set_once") - def map_to_record(result: dict) -> dict: + def map_to_record(row: dict) -> dict: + """Map row to a record to insert to Redshift.""" return { - key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key] + key: json.dumps(row[key]) if key in json_columns and row[key] is not None else row[key] for key in schema_columns } @@ -189,11 +206,6 @@ class RedshiftBatchExportWorkflow(PostHogWorkflow): Schedule. When ran by a schedule, `data_interval_end` should be set to `None` so that we will fetch the end of the interval from the Temporal search attribute `TemporalScheduledStartTime`. - - This Workflow executes the same insert activity as the PostgresBatchExportWorkflow, - as Postgres and AWS Redshift are fairly compatible. The only differences are: - * Postgres JSONB fields are VARCHAR in Redshift. - * Non retryable errors can be different between both. """ @staticmethod