diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 458b4cb1a6f1b7..1a2cf640554528 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -1,3 +1,4 @@ +import asyncio import collections.abc import contextlib import csv @@ -387,7 +388,10 @@ async def copy_tsv_to_postgres( fields=sql.SQL(",").join(sql.Identifier(column) for column in schema_columns), ) ) as copy: - while data := tsv_file.read(): + while data := await asyncio.to_thread(tsv_file.read): + # \u0000 cannot be present in PostgreSQL's jsonb type, and will cause an error. + # See: https://www.postgresql.org/docs/17/datatype-json.html + data = data.replace(b"\\u0000", b"") await copy.write(data) diff --git a/posthog/temporal/tests/batch_exports/conftest.py b/posthog/temporal/tests/batch_exports/conftest.py index 7044d8fe968680..8acd322390f975 100644 --- a/posthog/temporal/tests/batch_exports/conftest.py +++ b/posthog/temporal/tests/batch_exports/conftest.py @@ -239,7 +239,7 @@ async def generate_test_data( count_outside_range=10, count_other_team=10, duplicate=True, - properties={"$browser": "Chrome", "$os": "Mac OS X"}, + properties={"$browser": "Chrome", "$os": "Mac OS X", "unicode": "\u0000"}, person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, table=table, ) diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 56e4c914640329..ec4a345fba2b6b 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -136,6 +136,11 @@ async def assert_clickhouse_records_in_postgres( # bq_ingested_timestamp cannot be compared as it comes from an unstable function. continue + if isinstance(v, str): + v = v.replace("\\u0000", "") + elif isinstance(v, bytes): + v = v.replace(b"\\u0000", b"") + if k in {"properties", "set", "set_once", "person_properties", "elements"} and v is not None: expected_record[k] = json.loads(v) elif isinstance(v, dt.datetime):