From 83a07c1692dec7e9cbeed48d7875aac8e2e90208 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Thu, 2 Jan 2025 17:53:13 +0100 Subject: [PATCH] fix(data-warehouse): Dont throw when updating incremental value with an empty table (#27218) --- mypy-baseline.txt | 2 -- .../data_imports/pipelines/pipeline_sync.py | 6 ++++++ posthog/warehouse/models/external_data_schema.py | 14 +++++++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 58ea7ed8a90c2..a4444d8b67031 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -146,8 +146,6 @@ posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict ent posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "PathsFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "LifecycleFilter"; expected "str": "TrendsFilter" [dict-item] posthog/hogql_queries/legacy_compatibility/filter_to_query.py:0: error: Dict entry 0 has incompatible type "str": "StickinessFilter"; expected "str": "TrendsFilter" [dict-item] -posthog/warehouse/models/external_data_schema.py:0: error: Incompatible types in assignment (expression has type "str", variable has type "int | float") [assignment] -posthog/warehouse/models/external_data_schema.py:0: error: Incompatible types in assignment (expression has type "str", variable has type "int | float") [assignment] posthog/session_recordings/models/session_recording.py:0: error: Argument "distinct_id" to "MissingPerson" has incompatible type "str | None"; expected "str" [arg-type] posthog/session_recordings/models/session_recording.py:0: error: Incompatible type for lookup 'persondistinctid__team_id': (got "Team", expected "str | int") [misc] posthog/models/hog_functions/hog_function.py:0: error: Argument 1 to "get" of "dict" has incompatible type "str | None"; expected "str" [arg-type] diff --git a/posthog/temporal/data_imports/pipelines/pipeline_sync.py b/posthog/temporal/data_imports/pipelines/pipeline_sync.py index 3fca1a7a49c82..314dad8a436e1 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline_sync.py +++ b/posthog/temporal/data_imports/pipelines/pipeline_sync.py @@ -445,6 +445,12 @@ def save_last_incremental_value(schema_id: str, team_id: str, source: DltSource, logger.debug(f"Updating incremental_field_last_value with {last_value}") + if last_value is None: + logger.debug( + f"Incremental value is None. This could mean the table has zero rows. Full incremental object: {incremental_object}" + ) + return + schema.update_incremental_field_last_value(last_value) diff --git a/posthog/warehouse/models/external_data_schema.py b/posthog/warehouse/models/external_data_schema.py index ba07884346912..d90b699a7f4e0 100644 --- a/posthog/warehouse/models/external_data_schema.py +++ b/posthog/warehouse/models/external_data_schema.py @@ -77,6 +77,10 @@ def update_incremental_field_last_value(self, last_value: Any) -> None: incremental_field_type = self.sync_type_config.get("incremental_field_type") last_value_py = last_value.item() if isinstance(last_value, numpy.generic) else last_value + last_value_json: Any + + if last_value_py is None: + return if ( incremental_field_type == IncrementalFieldType.Integer @@ -85,9 +89,17 @@ def update_incremental_field_last_value(self, last_value: Any) -> None: if isinstance(last_value_py, int | float): last_value_json = last_value_py elif isinstance(last_value_py, datetime): - last_value_json = str(last_value_py) + last_value_json = last_value_py.isoformat() else: last_value_json = int(last_value_py) + elif ( + incremental_field_type == IncrementalFieldType.DateTime + or incremental_field_type == IncrementalFieldType.Timestamp + ): + if isinstance(last_value_py, datetime): + last_value_json = last_value_py.isoformat() + else: + last_value_json = str(last_value_py) else: last_value_json = str(last_value_py)