Skip to content

Commit

Permalink
fix: Address typing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Oct 30, 2023
1 parent ffc8b9f commit 4cae4ae
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from uuid import uuid4
from random import randint
import json
import datetime as dt
import json
import os
from random import randint
from uuid import uuid4

import psycopg2
from psycopg2 import sql
import pytest
from django.conf import settings
from psycopg2 import sql

from posthog.temporal.tests.batch_exports.base import (
EventValues,
Expand All @@ -33,7 +33,7 @@


def assert_events_in_redshift(connection, schema, table_name, events):
"""Assert provided events written to a given Postgres table."""
"""Assert provided events written to a given Redshift table."""

inserted_events = []

Expand Down Expand Up @@ -78,7 +78,10 @@ def assert_events_in_redshift(connection, schema, table_name, events):

@pytest.fixture
def redshift_config():
"""Fixture to provide a default configuration for Redshift batch exports."""
"""Fixture to provide a default configuration for Redshift batch exports.
Reads required env vars to construct configuration.
"""
user = os.environ["REDSHIFT_USER"]
password = os.environ["REDSHIFT_PASSWORD"]
host = os.environ["REDSHIFT_HOST"]
Expand All @@ -96,7 +99,16 @@ def redshift_config():

@pytest.fixture
def setup_test_db(redshift_config):
"""Fixture to manage a database for Redshift exports."""
"""Fixture to manage a database for Redshift export testing.
Managing a test database involves the following steps:
1. Creating a test database.
2. Initializing a connection to that database.
3. Creating a test schema.
4. Yielding the connection to be used in tests.
5. After tests, drop the test schema and any tables in it.
6. Drop the test database.
"""
connection = psycopg2.connect(
user=redshift_config["user"],
password=redshift_config["password"],
Expand Down
32 changes: 22 additions & 10 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections.abc
import datetime as dt
import json
import typing
Expand Down Expand Up @@ -31,7 +32,7 @@


def insert_records_to_redshift(
records: list[dict[str, typing.Any]],
records: collections.abc.Iterator[dict[str, typing.Any]],
redshift_connection: psycopg2.extensions.connection,
schema: str,
table: str,
Expand All @@ -53,7 +54,8 @@ def insert_records_to_redshift(
schema: The schema that contains the table where to insert the record.
table: The name of the table where to insert the record.
batch_size: Number of records to insert in batch. Setting this too high could
make us go OOM or exceed Redshift's SQL statement size limit (16MB).
make us go OOM or exceed Redshift's SQL statement size limit (16MB). Setting this too low
can significantly affect performance due to Redshift's poor handling of INSERTs.
"""
batch = [next(records)]

Expand Down Expand Up @@ -93,7 +95,21 @@ class RedshiftInsertInputs(PostgresInsertInputs):

@activity.defn
async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
"""Activity streams data from ClickHouse to Redshift."""
"""Activity to insert data from ClickHouse to Redshift.
This activity executes the following steps:
1. Check if anything is to be exported.
2. Create destination table if not present.
3. Query rows to export.
4. Insert rows into Redshift.
Args:
inputs: The dataclass holding inputs for this activity. The inputs
include: connection configuration (e.g. host, user, port), batch export
query parameters (e.g. team_id, data_interval_start, include_events), and
the Redshift-specific properties_data_type to indicate the type of JSON-like
fields.
"""
logger = get_batch_exports_logger(inputs=inputs)
logger.info(
"Running Postgres export batch %s - %s",
Expand Down Expand Up @@ -169,9 +185,10 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
]
json_columns = ("properties", "set", "set_once")

def map_to_record(result: dict) -> dict:
def map_to_record(row: dict) -> dict:
"""Map row to a record to insert to Redshift."""
return {
key: json.dumps(result[key]) if key in json_columns and result[key] is not None else result[key]
key: json.dumps(row[key]) if key in json_columns and row[key] is not None else row[key]
for key in schema_columns
}

Expand All @@ -189,11 +206,6 @@ class RedshiftBatchExportWorkflow(PostHogWorkflow):
Schedule. When ran by a schedule, `data_interval_end` should be set to
`None` so that we will fetch the end of the interval from the Temporal
search attribute `TemporalScheduledStartTime`.
This Workflow executes the same insert activity as the PostgresBatchExportWorkflow,
as Postgres and AWS Redshift are fairly compatible. The only differences are:
* Postgres JSONB fields are VARCHAR in Redshift.
* Non retryable errors can be different between both.
"""

@staticmethod
Expand Down

0 comments on commit 4cae4ae

Please sign in to comment.