From f5ccc6bc4c79d798a7fced4bfbda0ffa090e23b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 27 Oct 2023 18:54:41 +0200 Subject: [PATCH] fix(postgres-batch-exports): Account for schema being empty string --- posthog/temporal/workflows/postgres_batch_export.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index 1844742880876..3c19c8eae8998 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -57,7 +57,8 @@ def copy_tsv_to_postgres(tsv_file, postgres_connection, schema: str, table_name: tsv_file.seek(0) with postgres_connection.cursor() as cursor: - cursor.execute(sql.SQL("SET search_path TO {schema}").format(schema=sql.Identifier(schema))) + if schema: + cursor.execute(sql.SQL("SET search_path TO {schema}").format(schema=sql.Identifier(schema))) cursor.copy_from( tsv_file, table_name, @@ -128,6 +129,11 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): ) with postgres_connection(inputs) as connection: with connection.cursor() as cursor: + if inputs.schema: + table_identifier = sql.Identifier(inputs.schema, inputs.table_name) + else: + table_identifier = sql.Identifier(inputs.table_name) + result = cursor.execute( sql.SQL( """ @@ -145,7 +151,7 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs): "timestamp" TIMESTAMP WITH TIME ZONE ) """ - ).format(sql.Identifier(inputs.schema, inputs.table_name)) + ).format(table_identifier) ) schema_columns = [