Skip to content

Commit

Permalink
fix: Compatibility with Redshift (#18677)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Nov 16, 2023
1 parent b5cad0a commit af6b23c
Showing 1 changed file with 27 additions and 1 deletion.
28 changes: 27 additions & 1 deletion posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime as dt
import itertools
import json
import os
import typing
from dataclasses import dataclass

Expand Down Expand Up @@ -32,6 +33,31 @@
)


@contextlib.asynccontextmanager
async def redshift_connection(inputs) -> typing.AsyncIterator[psycopg.AsyncConnection]:
"""Manage a Redshift connection.
This just yields a Postgres connection but we adjust a couple of things required for
psycopg to work with Redshift:
1. Set PGCLIENTENCODING to utf-8 as Redshift reports back UNICODE.
2. Set prepare_threshold to None on the connection as psycopg attempts to run DEALLOCATE ALL otherwise
which is not supported on Redshift.
"""
old_value = os.environ.get("PGCLIENTENCODING", None)
os.environ["PGCLIENTENCODING"] = "utf-8"

try:
async with postgres_connection(inputs) as connection:
connection.prepare_threshold = None
yield connection

finally:
if old_value is None:
del os.environ["PGCLIENTENCODING"]
else:
os.environ["PGCLIENTENCODING"] = old_value


async def insert_records_to_redshift(
records: collections.abc.Iterator[dict[str, typing.Any]],
redshift_connection: psycopg.AsyncConnection,
Expand Down Expand Up @@ -186,7 +212,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs):
)
properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER"

async with postgres_connection(inputs) as connection:
async with redshift_connection(inputs) as connection:
await create_table_in_postgres(
connection,
schema=inputs.schema,
Expand Down

0 comments on commit af6b23c

Please sign in to comment.