Skip to content

Commit

Permalink
chore(data-warehouse): Dont sync our schema table interval column (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Jul 25, 2024
1 parent af4ea7f commit e2c83e3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 2 deletions.
16 changes: 14 additions & 2 deletions posthog/temporal/data_imports/pipelines/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def sql_source_for_type(
sslmode: str,
schema: str,
table_names: list[str],
team_id: Optional[int] = None,
incremental_field: Optional[str] = None,
incremental_field_type: Optional[IncrementalFieldType] = None,
) -> DltSource:
Expand All @@ -71,7 +72,9 @@ def sql_source_for_type(
else:
raise Exception("Unsupported source_type")

db_source = sql_database(credentials, schema=schema, table_names=table_names, incremental=incremental)
db_source = sql_database(
credentials, schema=schema, table_names=table_names, incremental=incremental, team_id=team_id
)

return db_source

Expand Down Expand Up @@ -110,13 +113,22 @@ def snowflake_source(
return db_source


# Temp while DLT doesn't support `interval` columns
def remove_interval(doc: dict, team_id: Optional[int]) -> dict:
if team_id == 1 or team_id == 2:
if "sync_frequency_interval" in doc:
del doc["sync_frequency_interval"]
return doc


@dlt.source(max_table_nesting=0)
def sql_database(
credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value,
schema: Optional[str] = dlt.config.value,
metadata: Optional[MetaData] = None,
table_names: Optional[List[str]] = dlt.config.value, # noqa: UP006
incremental: Optional[dlt.sources.incremental] = None,
team_id: Optional[int] = None,
) -> Iterable[DltResource]:
"""
A DLT source which loads data from an SQL database using SQLAlchemy.
Expand Down Expand Up @@ -161,7 +173,7 @@ def sql_database(
spec=SqlDatabaseTableConfiguration,
table_format="delta",
columns=get_column_hints(engine, schema or "", table.name),
)(
).add_map(lambda x: remove_interval(x, team_id))(
engine=engine,
table=table,
incremental=incremental,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
incremental_field_type=schema.sync_type_config.get("incremental_field_type")
if schema.is_incremental
else None,
team_id=inputs.team_id,
)

return await _run(
Expand All @@ -178,6 +179,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs):
incremental_field_type=schema.sync_type_config.get("incremental_field_type")
if schema.is_incremental
else None,
team_id=inputs.team_id,
)

return await _run(
Expand Down
3 changes: 3 additions & 0 deletions posthog/temporal/tests/batch_exports/test_import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async def test_postgres_source_without_ssh_tunnel(activity_environment, team, **
table_names=["table_1"],
incremental_field=None,
incremental_field_type=None,
team_id=team.id,
)


Expand Down Expand Up @@ -125,6 +126,7 @@ async def test_postgres_source_with_ssh_tunnel_disabled(activity_environment, te
table_names=["table_1"],
incremental_field=None,
incremental_field_type=None,
team_id=team.id,
)


Expand Down Expand Up @@ -180,4 +182,5 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
table_names=["table_1"],
incremental_field=None,
incremental_field_type=None,
team_id=team.id,
)

0 comments on commit e2c83e3

Please sign in to comment.