Skip to content

Commit

Permalink
fix: Redshift tests (#18679)
Browse files Browse the repository at this point in the history
* fix: Redshift tests

* fix: Update psycopg codecs instead of setting env var

* fix: Actually fix tests
  • Loading branch information
tomasfarias authored Nov 16, 2023
1 parent e83de60 commit b81cdeb
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def redshift_config():
return {
"user": user,
"password": password,
"database": "dev",
"database": "posthog_batch_exports_test",
"schema": "exports_test_schema",
"host": host,
"port": int(port),
Expand All @@ -124,7 +124,10 @@ def redshift_config():
@pytest.fixture
def postgres_config(redshift_config):
"""We shadow this name so that setup_postgres_test_db works with Redshift."""
return redshift_config
psycopg._encodings._py_codecs["UNICODE"] = "utf-8"
psycopg._encodings.py_codecs.update((k.encode(), v) for k, v in psycopg._encodings._py_codecs.items())

yield redshift_config


@pytest_asyncio.fixture
Expand All @@ -137,6 +140,7 @@ async def psycopg_connection(redshift_config, setup_postgres_test_db):
host=redshift_config["host"],
port=redshift_config["port"],
)
connection.prepare_threshold = None

yield connection

Expand Down
20 changes: 6 additions & 14 deletions posthog/temporal/workflows/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime as dt
import itertools
import json
import os
import typing
from dataclasses import dataclass

Expand Down Expand Up @@ -39,23 +38,16 @@ async def redshift_connection(inputs) -> typing.AsyncIterator[psycopg.AsyncConne
This just yields a Postgres connection but we adjust a couple of things required for
psycopg to work with Redshift:
1. Set PGCLIENTENCODING to utf-8 as Redshift reports back UNICODE.
1. Set UNICODE encoding to utf-8 as Redshift reports back UNICODE.
2. Set prepare_threshold to None on the connection as psycopg attempts to run DEALLOCATE ALL otherwise
which is not supported on Redshift.
"""
old_value = os.environ.get("PGCLIENTENCODING", None)
os.environ["PGCLIENTENCODING"] = "utf-8"
psycopg._encodings._py_codecs["UNICODE"] = "utf-8"
psycopg._encodings.py_codecs.update((k.encode(), v) for k, v in psycopg._encodings._py_codecs.items())

try:
async with postgres_connection(inputs) as connection:
connection.prepare_threshold = None
yield connection

finally:
if old_value is None:
del os.environ["PGCLIENTENCODING"]
else:
os.environ["PGCLIENTENCODING"] = old_value
async with postgres_connection(inputs) as connection:
connection.prepare_threshold = None
yield connection


async def insert_records_to_redshift(
Expand Down

0 comments on commit b81cdeb

Please sign in to comment.