Skip to content

Commit

Permalink
feat: Redshift supports custom schemas (#20177)
Browse files Browse the repository at this point in the history
* feat: Redshift supports custom schemas

* feat(test): Update redshift batch export tests

* fix: Ignore type in test

* fix: Update mypy-baseline

* fix: Sort items being tested to allow comparison
  • Loading branch information
tomasfarias authored Feb 13, 2024
1 parent 4791803 commit 92b3b10
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 95 deletions.
2 changes: 0 additions & 2 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,6 @@ posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error:
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "name" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "destination_data" to "acreate_batch_export" has incompatible type "object"; expected "dict[Any, Any]" [arg-type]
posthog/temporal/tests/batch_exports/test_backfill_batch_export.py:0: error: Argument "interval" to "acreate_batch_export" has incompatible type "object"; expected "str" [arg-type]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Unsupported left operand type for + ("None") [operator]
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: note: Left operand is of type "Any | None"
posthog/temporal/tests/batch_exports/test_redshift_batch_export_workflow.py:0: error: Incompatible types in assignment (expression has type "str | int", variable has type "int") [assignment]
posthog/api/test/batch_exports/conftest.py:0: error: Argument "activities" to "ThreadedWorker" has incompatible type "list[function]"; expected "Sequence[Callable[..., Any]]" [arg-type]
posthog/management/commands/test/test_create_batch_export_from_app.py:0: error: Incompatible return value type (got "dict[str, Collection[str]]", expected "dict[str, str]") [return-value]
Expand Down
4 changes: 2 additions & 2 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
extra_query_parameters=query_parameters,
)

first_record, record_iterator = peek_first_and_rewind(record_iterator)

if inputs.batch_export_schema is None:
table_fields = [
("uuid", "VARCHAR(200)"),
Expand All @@ -302,6 +300,8 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs):
]

else:
first_record, record_iterator = peek_first_and_rewind(record_iterator)

column_names = [column for column in first_record.schema.names if column != "_inserted_at"]
record_schema = first_record.select(column_names).schema
table_fields = get_postgres_fields_from_record_schema(
Expand Down
155 changes: 121 additions & 34 deletions posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from dataclasses import dataclass

import psycopg
import pyarrow as pa
from psycopg import sql
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.service import RedshiftBatchExportInputs
from posthog.batch_exports.service import BatchExportField, RedshiftBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
CreateBatchExportRunInputs,
Expand All @@ -30,6 +31,7 @@
create_table_in_postgres,
postgres_connection,
)
from posthog.temporal.batch_exports.utils import peek_first_and_rewind
from posthog.temporal.common.logger import bind_temporal_worker_logger


Expand Down Expand Up @@ -88,6 +90,80 @@ async def redshift_connection(inputs) -> typing.AsyncIterator[psycopg.AsyncConne
yield connection


def redshift_default_fields() -> list[BatchExportField]:
batch_export_fields = default_fields()
batch_export_fields.append(
{
"expression": "nullIf(JSONExtractString(properties, '$ip'), '')",
"alias": "ip",
}
)
# Fields kept or removed for backwards compatibility with legacy apps schema.
batch_export_fields.append({"expression": "''", "alias": "elements"})
batch_export_fields.append({"expression": "''", "alias": "site_url"})
batch_export_fields.pop(batch_export_fields.index({"expression": "created_at", "alias": "created_at"}))
# Team ID is (for historical reasons) an INTEGER (4 bytes) in PostgreSQL, but in ClickHouse is stored as Int64.
# We can't encode it as an Int64, as this includes 4 extra bytes, and PostgreSQL will reject the data with a
# 'incorrect binary data format' error on the column, so we cast it to Int32.
team_id_field = batch_export_fields.pop(
batch_export_fields.index(BatchExportField(expression="team_id", alias="team_id"))
)
team_id_field["expression"] = "toInt32(team_id)"
batch_export_fields.append(team_id_field)
return batch_export_fields


RedshiftField = tuple[str, str]


def get_redshift_fields_from_record_schema(
record_schema: pa.Schema, known_super_columns: list[str]
) -> list[RedshiftField]:
"""Generate a list of supported PostgreSQL fields from PyArrow schema.
This function is used to map custom schemas to Redshift-supported types. Some loss of precision is
expected.
"""
pg_schema: list[RedshiftField] = []

for name in record_schema.names:
pa_field = record_schema.field(name)

if pa.types.is_string(pa_field.type):
if pa_field.name in known_super_columns:
pg_type = "SUPER"
else:
pg_type = "TEXT"

elif pa.types.is_signed_integer(pa_field.type):
if pa.types.is_int64(pa_field.type):
pg_type = "BIGINT"
else:
pg_type = "INTEGER"

elif pa.types.is_floating(pa_field.type):
if pa.types.is_float64(pa_field.type):
pg_type = "DOUBLE PRECISION"
else:
pg_type = "REAL"

elif pa.types.is_boolean(pa_field.type):
pg_type = "BOOLEAN"

elif pa.types.is_timestamp(pa_field.type):
if pa_field.type.tz is not None:
pg_type = "TIMESTAMPTZ"
else:
pg_type = "TIMESTAMP"

else:
raise TypeError(f"Unsupported type: {pa_field.type}")

pg_schema.append((name, pg_type))

return pg_schema


async def insert_records_to_redshift(
records: collections.abc.Iterator[dict[str, typing.Any]],
redshift_connection: psycopg.AsyncConnection,
Expand Down Expand Up @@ -233,10 +309,13 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):

logger.info("BatchExporting %s rows", count)

fields = default_fields()
# Fields kept for backwards compatibility with legacy apps schema.
fields.append({"expression": "nullIf(JSONExtractString(properties, '$ip'), '')", "alias": "ip"})
fields.append({"expression": "''", "alias": "site_url"})
if inputs.batch_export_schema is None:
fields = redshift_default_fields()
query_parameters = None

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]

record_iterator = iter_records(
client=client,
Expand All @@ -246,52 +325,59 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
)

known_super_columns = ["properties", "set", "set_once", "person_properties"]

if inputs.properties_data_type != "varchar":
properties_type = "SUPER"
else:
properties_type = "VARCHAR(65535)"

if inputs.batch_export_schema is None:
table_fields = [
("uuid", "VARCHAR(200)"),
("event", "VARCHAR(200)"),
("properties", properties_type),
("elements", "VARCHAR(65535)"),
("set", properties_type),
("set_once", properties_type),
("distinct_id", "VARCHAR(200)"),
("team_id", "INTEGER"),
("ip", "VARCHAR(200)"),
("site_url", "VARCHAR(200)"),
("timestamp", "TIMESTAMP WITH TIME ZONE"),
]
else:
first_record, record_iterator = peek_first_and_rewind(record_iterator)

column_names = [column for column in first_record.schema.names if column != "_inserted_at"]
record_schema = first_record.select(column_names).schema
table_fields = get_redshift_fields_from_record_schema(
record_schema, known_super_columns=known_super_columns
)

async with redshift_connection(inputs) as connection:
await create_table_in_postgres(
connection,
schema=inputs.schema,
table_name=inputs.table_name,
fields=[
("uuid", "VARCHAR(200)"),
("event", "VARCHAR(200)"),
("properties", properties_type),
("elements", "VARCHAR(65535)"),
("set", properties_type),
("set_once", properties_type),
("distinct_id", "VARCHAR(200)"),
("team_id", "INTEGER"),
("ip", "VARCHAR(200)"),
("site_url", "VARCHAR(200)"),
("timestamp", "TIMESTAMP WITH TIME ZONE"),
],
fields=table_fields,
)

schema_columns = set((field[0] for field in table_fields))

def map_to_record(row: dict) -> dict:
"""Map row to a record to insert to Redshift."""
record = {
"distinct_id": row["distinct_id"],
"elements": "",
"event": row["event"],
"ip": row["ip"],
"properties": json.loads(row["properties"]) if row["properties"] is not None else None,
"set": json.loads(row["set"]) if row["set"] is not None else None,
"set_once": json.loads(row["set_once"]) if row["set_once"] is not None else None,
"site_url": row["site_url"],
"team_id": row["team_id"],
"timestamp": row["timestamp"],
"uuid": row["uuid"],
}

for column in ("properties", "set", "set_once"):
record = {k: v for k, v in row.items() if k in schema_columns}

for column in known_super_columns:
if record.get(column, None) is not None:
record[column] = json.dumps(remove_escaped_whitespace_recursive(record[column]), ensure_ascii=False)
# TODO: We should be able to save a json.loads here.
record[column] = json.dumps(
remove_escaped_whitespace_recursive(json.loads(record[column])), ensure_ascii=False
)

return record

Expand Down Expand Up @@ -364,6 +450,7 @@ async def run(self, inputs: RedshiftBatchExportInputs):
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
properties_data_type=inputs.properties_data_type,
batch_export_schema=inputs.batch_export_schema,
)

await execute_batch_export_insert_activity(
Expand Down
Loading

0 comments on commit 92b3b10

Please sign in to comment.