diff --git a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py index 04fb8885701da..8d8efec98c1a8 100644 --- a/posthog/temporal/data_imports/pipelines/sql_database/__init__.py +++ b/posthog/temporal/data_imports/pipelines/sql_database/__init__.py @@ -79,7 +79,7 @@ def snowflake_source( table_names: list[str], role: Optional[str] = None, incremental_field: Optional[str] = None, - incremental_field_type: Optional[str] = None, + incremental_field_type: Optional[IncrementalFieldType] = None, ) -> DltSource: account_id = quote(account_id) user = quote(user) @@ -88,10 +88,17 @@ def snowflake_source( warehouse = quote(warehouse) role = quote(role) if role else None + if incremental_field is not None and incremental_field_type is not None: + incremental: dlt.sources.incremental | None = dlt.sources.incremental( + cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) + ) + else: + incremental = None + credentials = ConnectionStringCredentials( f"snowflake://{user}:{password}@{account_id}/{database}/{schema}?warehouse={warehouse}{f'&role={role}' if role else ''}" ) - db_source = sql_database(credentials, schema=schema, table_names=table_names) + db_source = sql_database(credentials, schema=schema, table_names=table_names, incremental=incremental) return db_source diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 9849339e785c7..9ca8f04b59e2b 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -206,6 +206,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs): warehouse=warehouse, role=role, table_names=endpoints, + incremental_field=schema.sync_type_config.get("incremental_field") if schema.is_incremental else None, + incremental_field_type=schema.sync_type_config.get("incremental_field_type") + if schema.is_incremental + else None, ) return await _run(