From bb5293bae1d4c8903b31d064719691c211de7f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Thu, 16 Nov 2023 14:10:32 +0100 Subject: [PATCH] fix: Compatibility with Redshift --- .../workflows/redshift_batch_export.py | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/posthog/temporal/workflows/redshift_batch_export.py b/posthog/temporal/workflows/redshift_batch_export.py index a853c91e79e1f..7b008d4f5b674 100644 --- a/posthog/temporal/workflows/redshift_batch_export.py +++ b/posthog/temporal/workflows/redshift_batch_export.py @@ -3,6 +3,7 @@ import datetime as dt import itertools import json +import os import typing from dataclasses import dataclass @@ -32,6 +33,31 @@ ) +@contextlib.asynccontextmanager +async def redshift_connection(inputs) -> typing.AsyncIterator[psycopg.AsyncConnection]: + """Manage a Redshift connection. + + This just yields a Postgres connection but we adjust a couple of things required for + psycopg to work with Redshift: + 1. Set PGCLIENTENCODING to utf-8 as Redshift reports back UNICODE. + 2. Set prepare_threshold to None on the connection as psycopg attempts to run DEALLOCATE ALL otherwise + which is not supported on Redshift. + """ + old_value = os.environ.get("PGCLIENTENCODING", None) + os.environ["PGCLIENTENCODING"] = "utf-8" + + try: + async with postgres_connection(inputs) as connection: + connection.prepare_threshold = None + yield connection + + finally: + if old_value is None: + del os.environ["PGCLIENTENCODING"] + else: + os.environ["PGCLIENTENCODING"] = old_value + + async def insert_records_to_redshift( records: collections.abc.Iterator[dict[str, typing.Any]], redshift_connection: psycopg.AsyncConnection, @@ -186,7 +212,7 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs): ) properties_type = "VARCHAR(65535)" if inputs.properties_data_type == "varchar" else "SUPER" - async with postgres_connection(inputs) as connection: + async with redshift_connection(inputs) as connection: await create_table_in_postgres( connection, schema=inputs.schema,