From e2c83e3209bc7b6e747bbd3e330bd5f648037240 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 25 Jul 2024 22:28:09 +0100 Subject: [PATCH] chore(data-warehouse): Dont sync our schema table interval column (#23988) --- .../pipelines/sql_database/__init__.py | 16 ++++++++++++++-- .../workflow_activities/import_data.py | 2 ++ .../tests/batch_exports/test_import_data.py | 3 +++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index 6f9ec4c1162b7..a3524f378e1a7 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -46,6 +46,7 @@ def sql_source_for_type( sslmode: str, schema: str, table_names: list[str], + team_id: Optional[int] = None, incremental_field: Optional[str] = None, incremental_field_type: Optional[IncrementalFieldType] = None, ) -> DltSource: @@ -71,7 +72,9 @@ def sql_source_for_type( else: raise Exception("Unsupported source_type") - db_source = sql_database(credentials, schema=schema, table_names=table_names, incremental=incremental) + db_source = sql_database( + credentials, schema=schema, table_names=table_names, incremental=incremental, team_id=team_id + ) return db_source @@ -110,6 +113,14 @@ def snowflake_source( return db_source +# Temp while DLT doesn't support `interval` columns +def remove_interval(doc: dict, team_id: Optional[int]) -> dict: + if team_id == 1 or team_id == 2: + if "sync_frequency_interval" in doc: + del doc["sync_frequency_interval"] + return doc + + @dlt.source(max_table_nesting=0) def sql_database( credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, @@ -117,6 +128,7 @@ def sql_database( metadata: Optional[MetaData] = None, table_names: Optional[List[str]] = dlt.config.value, # noqa: UP006 incremental: Optional[dlt.sources.incremental] = None, + team_id: Optional[int] = None, ) -> Iterable[DltResource]: """ A DLT source which loads data from an SQL database using SQLAlchemy. @@ -161,7 +173,7 @@ def sql_database( spec=SqlDatabaseTableConfiguration, table_format="delta", columns=get_column_hints(engine, schema or "", table.name), - )( + ).add_map(lambda x: remove_interval(x, team_id))( engine=engine, table=table, incremental=incremental, diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 103408db92bd4..db6e0aedd4bcf 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -153,6 +153,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs): incremental_field_type=schema.sync_type_config.get("incremental_field_type") if schema.is_incremental else None, + team_id=inputs.team_id, ) return await _run( @@ -178,6 +179,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs): incremental_field_type=schema.sync_type_config.get("incremental_field_type") if schema.is_incremental else None, + team_id=inputs.team_id, ) return await _run( diff --git a/posthog/temporal/tests/batch_exports/test_import_data.py b/posthog/temporal/tests/batch_exports/test_import_data.py index 935781c3bdf34..58b2e66e9f26b 100644 --- a/posthog/temporal/tests/batch_exports/test_import_data.py +++ b/posthog/temporal/tests/batch_exports/test_import_data.py @@ -87,6 +87,7 @@ async def test_postgres_source_without_ssh_tunnel(activity_environment, team, ** table_names=["table_1"], incremental_field=None, incremental_field_type=None, + team_id=team.id, ) @@ -125,6 +126,7 @@ async def test_postgres_source_with_ssh_tunnel_disabled(activity_environment, te table_names=["table_1"], incremental_field=None, incremental_field_type=None, + team_id=team.id, ) @@ -180,4 +182,5 @@ def __exit__(self, exc_type, exc_value, exc_traceback): table_names=["table_1"], incremental_field=None, incremental_field_type=None, + team_id=team.id, )