diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 57a7e095f6298..301b9f58e0a78 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -3,7 +3,23 @@ posthog/temporal/common/utils.py:0: note: This is likely because "from_activity" posthog/temporal/common/utils.py:0: error: Argument 2 to "__get__" of "classmethod" has incompatible type "type[HeartbeatType]"; expected "type[Never]" [arg-type] posthog/tasks/exports/ordered_csv_renderer.py:0: error: No return value expected [return-value] posthog/warehouse/models/ssh_tunnel.py:0: error: Incompatible types in assignment (expression has type "NoEncryption", variable has type "BestAvailableEncryption") [assignment] +posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py:0: error: Statement is unreachable [unreachable] +posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py:0: error: Non-overlapping equality check (left operand type: "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'json', 'decimal', 'wei', 'date', 'time'] | None", right operand type: "Literal['interval']") [comparison-overlap] +posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, ndarray[Any, dtype[Any]]]"; expected type "str" [index] +posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py:0: error: Invalid index type "str | None" for "dict[str, TColumnSchema]"; expected type "str" [index] posthog/temporal/data_imports/pipelines/sql_database/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Item "None" of "Incremental[Any] | None" has no attribute "row_order" [union-attr] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "Literal['asc', 'desc'] | Any | None", variable has type "Literal['asc', 'desc']") [assignment] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Column[Any]") [assignment] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "Literal['asc', 'desc']") [assignment] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Item "None" of "dict[str, Any] | None" has no attribute "get" [union-attr] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Argument "primary_key" to "make_hints" has incompatible type "list[str] | None"; expected "str | Sequence[str] | Callable[[Any], str | Sequence[str]]" [arg-type] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Dict entry 2 has incompatible type "Literal['auto']": "None"; expected "Literal['json_response', 'header_link', 'auto', 'single_page', 'cursor', 'offset', 'page_number']": "type[BasePaginator]" [dict-item] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Incompatible types in assignment (expression has type "None", variable has type "AuthConfigBase") [assignment] posthog/temporal/data_imports/pipelines/rest_source/config_setup.py:0: error: Argument 1 to "get_auth_class" has incompatible type "Literal['bearer', 'api_key', 'http_basic'] | None"; expected "Literal['bearer', 'api_key', 'http_basic']" [arg-type] @@ -41,7 +57,6 @@ posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argume posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument 1 to "exclude_keys" has incompatible type "dict[str, ResolveParamConfig | IncrementalParamConfig | Any] | None"; expected "Mapping[str, Any]" [arg-type] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Incompatible default for argument "resolved_param" (default has type "ResolvedParam | None", argument has type "ResolvedParam") [assignment] posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] -posthog/temporal/data_imports/pipelines/rest_source/__init__.py:0: error: Argument "module" to "SourceInfo" has incompatible type Module | None; expected Module [arg-type] posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/vitally/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] @@ -445,6 +460,23 @@ posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "typ posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/data_imports/pipelines/stripe/__init__.py:0: error: Unused "type: ignore" comment [unused-ignore] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: error: No overload variant of "with_only_columns" of "Select" matches argument type "ReadOnlyColumnCollection[str, Column[Any]]" [call-overload] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: Possible overload variants: +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], /) -> Select[tuple[_T0]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], /) -> Select[tuple[_T0, _T1]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], /) -> Select[tuple[_T0, _T1, _T2]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], /) -> Select[tuple[_T0, _T1, _T2, _T3]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7] with_only_columns(self, TypedColumnsClauseRole[_T0] | SQLCoreOperations[_T0] | type[_T0], TypedColumnsClauseRole[_T1] | SQLCoreOperations[_T1] | type[_T1], TypedColumnsClauseRole[_T2] | SQLCoreOperations[_T2] | type[_T2], TypedColumnsClauseRole[_T3] | SQLCoreOperations[_T3] | type[_T3], TypedColumnsClauseRole[_T4] | SQLCoreOperations[_T4] | type[_T4], TypedColumnsClauseRole[_T5] | SQLCoreOperations[_T5] | type[_T5], TypedColumnsClauseRole[_T6] | SQLCoreOperations[_T6] | type[_T6], TypedColumnsClauseRole[_T7] | SQLCoreOperations[_T7] | type[_T7], /) -> Select[tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def with_only_columns(self, *entities: TypedColumnsClauseRole[Any] | ColumnsClauseRole | SQLCoreOperations[Any] | Literal['*', 1] | type[Any] | Inspectable[_HasClauseElement[Any]] | _HasClauseElement[Any], maintain_column_froms: bool = ..., **Any) -> Select[Any] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: error: No overload variant of "resource" matches argument types "Callable[[Engine, Table, int, Literal['sqlalchemy', 'pyarrow', 'pandas', 'connectorx'], Incremental[Any] | None, bool, Callable[[Table], None] | None, Literal['minimal', 'full', 'full_with_precision'], dict[str, Any] | None, Callable[[TypeEngine[Any]], TypeEngine[Any] | type[TypeEngine[Any]] | None] | None, list[str] | None, Callable[[Select[Any], Table], Select[Any]] | None, list[str] | None], Iterator[Any]]", "str", "list[str] | None", "list[str] | None", "dict[str, TColumnSchema]", "Collection[str]", "str" [call-overload] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: Possible overload variants: +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TResourceFunParams`-1, TDltResourceImpl: DltResource] resource(Callable[TResourceFunParams, Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> Callable[[Callable[TResourceFunParams, Any]], TDltResourceImpl] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(None = ..., /, name: str | Callable[[Any], str] = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ..., standalone: Literal[True] = ...) -> Callable[[Callable[TResourceFunParams, Any]], Callable[TResourceFunParams, TDltResourceImpl]] +posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py:0: note: def [TDltResourceImpl: DltResource] resource(list[Any] | tuple[Any] | Iterator[Any], /, name: str = ..., table_name: str | Callable[[Any], str] = ..., max_table_nesting: int = ..., write_disposition: Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict | Callable[[Any], Literal['skip', 'append', 'replace', 'merge'] | TWriteDispositionDict | TMergeDispositionDict | TScd2StrategyDict] = ..., columns: dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel] | Callable[[Any], dict[str, TColumnSchema] | Sequence[TColumnSchema] | BaseModel | type[BaseModel]] = ..., primary_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., merge_key: str | Sequence[str] | Callable[[Any], str | Sequence[str]] = ..., schema_contract: Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict | Callable[[Any], Literal['evolve', 'discard_value', 'freeze', 'discard_row'] | TSchemaContractDict] = ..., table_format: Literal['iceberg', 'delta', 'hive'] | Callable[[Any], Literal['iceberg', 'delta', 'hive']] = ..., file_format: Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference'] | Callable[[Any], Literal['preferred', 'jsonl', 'typed-jsonl', 'insert_values', 'parquet', 'csv', 'reference']] = ..., references: Sequence[TTableReference] | Callable[[Any], Sequence[TTableReference]] = ..., selected: bool = ..., spec: type[BaseConfiguration] = ..., parallelized: bool = ..., _impl_cls: type[TDltResourceImpl] = ...) -> TDltResourceImpl posthog/tasks/test/test_update_survey_iteration.py:0: error: Item "None" of "FeatureFlag | None" has no attribute "filters" [union-attr] posthog/tasks/test/test_stop_surveys_reached_target.py:0: error: No overload variant of "__sub__" of "datetime" matches argument type "None" [operator] posthog/tasks/test/test_stop_surveys_reached_target.py:0: note: Possible overload variants: @@ -763,6 +795,9 @@ posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_run_updates.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/temporal/tests/batch_exports/test_batch_exports.py:0: error: TypedDict key must be a string literal; expected one of ("_timestamp", "created_at", "distinct_id", "elements", "elements_chain", ...) [literal-required] +posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 20 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] +posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 21 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] +posthog/temporal/data_modeling/run_workflow.py:0: error: Dict entry 22 has incompatible type "str": "Literal['complex']"; expected "str": "Literal['text', 'double', 'bool', 'timestamp', 'bigint', 'binary', 'json', 'decimal', 'wei', 'date', 'time']" [dict-item] posthog/session_recordings/session_recording_api.py:0: error: Argument "team_id" to "get_realtime_snapshots" has incompatible type "int"; expected "str" [arg-type] posthog/session_recordings/session_recording_api.py:0: error: Value of type variable "SupportsRichComparisonT" of "sorted" cannot be "str | None" [type-var] posthog/session_recordings/session_recording_api.py:0: error: Argument 1 to "get" of "dict" has incompatible type "str | None"; expected "str" [arg-type] @@ -794,8 +829,6 @@ posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py:0: posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "last_uploaded_part_timestamp" [attr-defined] posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py:0: error: "tuple[Any, ...]" has no attribute "upload_state" [attr-defined] posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Argument "job_type" to "PipelineInputs" has incompatible type "str"; expected "Type" [arg-type] -posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Argument "source_type" to "sql_source_for_type" has incompatible type "str"; expected "Type" [arg-type] -posthog/temporal/data_imports/workflow_activities/import_data.py:0: error: Argument "source_type" to "sql_source_for_type" has incompatible type "str"; expected "Type" [arg-type] posthog/migrations/0237_remove_timezone_from_teams.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type] posthog/migrations/0228_fix_tile_layouts.py:0: error: Argument 2 to "RunPython" has incompatible type "Callable[[Migration, Any], None]"; expected "_CodeCallable | None" [arg-type] posthog/api/plugin_log_entry.py:0: error: Name "timezone.datetime" is not defined [name-defined] diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 45aeb1860b29c..17e313fc065fb 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -95,6 +95,9 @@ def _create_pipeline(self): pipeline_name = self._get_pipeline_name() destination = self._get_destination() + dlt.config["normalize.parquet_normalizer.add_dlt_load_id"] = True + dlt.config["normalize.parquet_normalizer.add_dlt_id"] = True + return dlt.pipeline( pipeline_name=pipeline_name, destination=destination, dataset_name=self.inputs.dataset_name, progress="log" ) diff --git a/posthog/temporal/data_imports/pipelines/rest_source/__init__.py b/posthog/temporal/data_imports/pipelines/rest_source/__init__.py index 5dceafd1d2aec..d17e124dfb12f 100644 --- a/posthog/temporal/data_imports/pipelines/rest_source/__init__.py +++ b/posthog/temporal/data_imports/pipelines/rest_source/__init__.py @@ -376,18 +376,18 @@ def identity_func(x: Any) -> Any: # XXX: This is a workaround pass test_dlt_init.py # since the source uses dlt.source as a function -def _register_source(source_func: Callable[..., DltSource]) -> None: - import inspect - from dlt.common.configuration import get_fun_spec - from dlt.common.source import _SOURCES, SourceInfo - - spec = get_fun_spec(source_func) - func_module = inspect.getmodule(source_func) - _SOURCES[source_func.__name__] = SourceInfo( - SPEC=spec, - f=source_func, - module=func_module, - ) +# def _register_source(source_func: Callable[..., DltSource]) -> None: +# import inspect +# from dlt.common.configuration import get_fun_spec +# from dlt.common.source import _SOURCES, SourceInfo + +# spec = get_fun_spec(source_func) +# func_module = inspect.getmodule(source_func) +# _SOURCES[source_func.__name__] = SourceInfo( +# SPEC=spec, +# f=source_func, +# module=func_module, +# ) -_register_source(rest_api_source) +# _register_source(rest_api_source) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py new file mode 100644 index 0000000000000..39e353b8603aa --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/__init__.py @@ -0,0 +1,376 @@ +"""Source that loads tables form any SQLAlchemy supported database, supports batching requests and incremental loads.""" + +from datetime import datetime, date +from typing import Optional, Union, Any +from collections.abc import Callable, Iterable + +from sqlalchemy import MetaData, Table, create_engine +from sqlalchemy.engine import Engine +from zoneinfo import ZoneInfo + +import dlt +from dlt.sources import DltResource, DltSource +from urllib.parse import quote +from dlt.common.libs.pyarrow import pyarrow as pa +from dlt.sources.credentials import ConnectionStringCredentials + +from posthog.settings.utils import get_from_env +from posthog.temporal.data_imports.pipelines.sql_database_v2._json import BigQueryJSON +from posthog.utils import str_to_bool +from posthog.warehouse.models import ExternalDataSource +from posthog.warehouse.types import IncrementalFieldType + +from .helpers import ( + SelectAny, + table_rows, + engine_from_credentials, + TableBackend, + SqlTableResourceConfiguration, + _detect_precision_hints_deprecated, +) +from .schema_types import ( + default_table_adapter, + table_to_columns, + get_primary_key, + ReflectionLevel, + TTypeAdapter, +) + +from sqlalchemy_bigquery import BigQueryDialect, __all__ +from sqlalchemy_bigquery._types import _type_map + +# Workaround to get JSON support in the BigQuery Dialect +BigQueryDialect.JSON = BigQueryJSON +_type_map["JSON"] = BigQueryJSON +__all__.append("JSON") + + +def incremental_type_to_initial_value(field_type: IncrementalFieldType) -> Any: + if field_type == IncrementalFieldType.Integer or field_type == IncrementalFieldType.Numeric: + return 0 + if field_type == IncrementalFieldType.DateTime or field_type == IncrementalFieldType.Timestamp: + return datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=ZoneInfo("UTC")) + if field_type == IncrementalFieldType.Date: + return date(1970, 1, 1) + + +def sql_source_for_type( + source_type: ExternalDataSource.Type, + host: str, + port: int, + user: str, + password: str, + database: str, + sslmode: str, + schema: str, + table_names: list[str], + team_id: Optional[int] = None, + incremental_field: Optional[str] = None, + incremental_field_type: Optional[IncrementalFieldType] = None, +) -> DltSource: + host = quote(host) + user = quote(user) + password = quote(password) + database = quote(database) + sslmode = quote(sslmode) + + if incremental_field is not None and incremental_field_type is not None: + incremental: dlt.sources.incremental | None = dlt.sources.incremental( + cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) + ) + else: + incremental = None + + connect_args = [] + + if source_type == ExternalDataSource.Type.POSTGRES: + credentials = ConnectionStringCredentials( + f"postgresql://{user}:{password}@{host}:{port}/{database}?sslmode={sslmode}" + ) + elif source_type == ExternalDataSource.Type.MYSQL: + # We have to get DEBUG in temporal workers cos we're not loading Django in the same way as the app + is_debug = get_from_env("DEBUG", False, type_cast=str_to_bool) + ssl_ca = "/etc/ssl/cert.pem" if is_debug else "/etc/ssl/certs/ca-certificates.crt" + credentials = ConnectionStringCredentials( + f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?ssl_ca={ssl_ca}&ssl_verify_cert=false" + ) + + # PlanetScale needs this to be set + if host.endswith("psdb.cloud"): + connect_args = ["SET workload = 'OLAP';"] + elif source_type == ExternalDataSource.Type.MSSQL: + credentials = ConnectionStringCredentials( + f"mssql+pyodbc://{user}:{password}@{host}:{port}/{database}?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes" + ) + else: + raise Exception("Unsupported source_type") + + db_source = sql_database( + credentials, + schema=schema, + table_names=table_names, + incremental=incremental, + team_id=team_id, + connect_args=connect_args, + ) + + return db_source + + +def bigquery_source( + dataset_id: str, + project_id: str, + private_key: str, + private_key_id: str, + client_email: str, + token_uri: str, + table_name: str, + bq_destination_table_id: str, + incremental_field: Optional[str] = None, + incremental_field_type: Optional[IncrementalFieldType] = None, +) -> DltSource: + if incremental_field is not None and incremental_field_type is not None: + incremental: dlt.sources.incremental | None = dlt.sources.incremental( + cursor_path=incremental_field, initial_value=incremental_type_to_initial_value(incremental_field_type) + ) + else: + incremental = None + + credentials_info = { + "type": "service_account", + "project_id": project_id, + "private_key": private_key, + "private_key_id": private_key_id, + "client_email": client_email, + "token_uri": token_uri, + } + + engine = create_engine( + f"bigquery://{project_id}/{dataset_id}?create_disposition=CREATE_IF_NEEDED&allowLargeResults=true&destination={bq_destination_table_id}", + credentials_info=credentials_info, + ) + + return sql_database(engine, schema=None, table_names=[table_name], incremental=incremental) + + +@dlt.source(max_table_nesting=0) +def sql_database( + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + table_names: Optional[list[str]] = dlt.config.value, + chunk_size: int = 50000, + backend: TableBackend = "pyarrow", + detect_precision_hints: Optional[bool] = False, + reflection_level: Optional[ReflectionLevel] = "full", + defer_table_reflect: Optional[bool] = None, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + backend_kwargs: Optional[dict[str, Any]] = None, + include_views: bool = False, + type_adapter_callback: Optional[TTypeAdapter] = None, + incremental: Optional[dlt.sources.incremental] = None, + team_id: Optional[int] = None, + connect_args: Optional[list[str]] = None, +) -> Iterable[DltResource]: + """ + A dlt source which loads data from an SQL database using SQLAlchemy. + Resources are automatically created for each table in the schema or from the given list of tables. + + Args: + credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `sqlalchemy.Engine` instance. + schema (Optional[str]): Name of the database schema to load (if different from default). + metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. `schema` argument is ignored when this is used. + table_names (Optional[List[str]]): A list of table names to load. By default, all tables in the schema are loaded. + chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. + backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". + "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. + "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, + "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. + detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. + This is disabled by default. + reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. + "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. + defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Requires table_names to be explicitly passed. + Enable this option when running on Airflow. Available on dlt 0.4.4 and later. + table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. + backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. + include_views (bool): Reflect views as well as tables. Note view names included in `table_names` are always included regardless of this setting. + type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. + Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) + query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. + The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. + + Returns: + Iterable[DltResource]: A list of DLT resources for each table to be loaded. + """ + # detect precision hints is deprecated + _detect_precision_hints_deprecated(detect_precision_hints) + + if detect_precision_hints: + reflection_level = "full_with_precision" + else: + reflection_level = reflection_level or "minimal" + + # set up alchemy engine + engine = engine_from_credentials(credentials) + engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) + metadata = metadata or MetaData(schema=schema) + + # use provided tables or all tables + if table_names: + tables = [ + Table(name, metadata, inherit_cache=False, autoload_with=None if defer_table_reflect else engine) + for name in table_names + ] + else: + if defer_table_reflect: + raise ValueError("You must pass table names to defer table reflection") + metadata.reflect(bind=engine, views=include_views) + tables = list(metadata.tables.values()) + + for table in tables: + yield sql_table( + credentials=credentials, + table=table.name, + schema=table.schema, + metadata=metadata, + chunk_size=chunk_size, + backend=backend, + reflection_level=reflection_level, + defer_table_reflect=defer_table_reflect, + table_adapter_callback=table_adapter_callback, + backend_kwargs=backend_kwargs, + type_adapter_callback=type_adapter_callback, + incremental=incremental, + team_id=team_id, + connect_args=connect_args, + ) + + +# Temp while we dont support binary columns in HogQL +def remove_columns(columns_to_drop: list[str], team_id: Optional[int]): + col_len = len(columns_to_drop) + + def internal_remove(table: pa.Table) -> pa.Table: + if col_len == 0: + return table + + table_cols = [n for n in columns_to_drop if n in table.column_names] + if len(table_cols) > 0: + return table.drop(columns_to_drop) + + return table + + return internal_remove + + +@dlt.resource(name=lambda args: args["table"], standalone=True, spec=SqlTableResourceConfiguration) +def sql_table( + credentials: Union[ConnectionStringCredentials, Engine, str] = dlt.secrets.value, + table: str = dlt.config.value, + schema: Optional[str] = dlt.config.value, + metadata: Optional[MetaData] = None, + incremental: Optional[dlt.sources.incremental[Any]] = None, + chunk_size: int = 50000, + backend: TableBackend = "sqlalchemy", + detect_precision_hints: Optional[bool] = None, + reflection_level: Optional[ReflectionLevel] = "full", + defer_table_reflect: Optional[bool] = None, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + backend_kwargs: Optional[dict[str, Any]] = None, + type_adapter_callback: Optional[TTypeAdapter] = None, + included_columns: Optional[list[str]] = None, + team_id: Optional[int] = None, + connect_args: Optional[list[str]] = None, +) -> DltResource: + """ + A dlt resource which loads data from an SQL database table using SQLAlchemy. + + Args: + credentials (Union[ConnectionStringCredentials, Engine, str]): Database credentials or an `Engine` instance representing the database connection. + table (str): Name of the table or view to load. + schema (Optional[str]): Optional name of the schema the table belongs to. + metadata (Optional[MetaData]): Optional `sqlalchemy.MetaData` instance. If provided, the `schema` argument is ignored. + incremental (Optional[dlt.sources.incremental[Any]]): Option to enable incremental loading for the table. + E.g., `incremental=dlt.sources.incremental('updated_at', pendulum.parse('2022-01-01T00:00:00Z'))` + chunk_size (int): Number of rows yielded in one batch. SQL Alchemy will create additional internal rows buffer twice the chunk size. + backend (TableBackend): Type of backend to generate table data. One of: "sqlalchemy", "pyarrow", "pandas" and "connectorx". + "sqlalchemy" yields batches as lists of Python dictionaries, "pyarrow" and "connectorx" yield batches as arrow tables, "pandas" yields panda frames. + "sqlalchemy" is the default and does not require additional dependencies, "pyarrow" creates stable destination schemas with correct data types, + "connectorx" is typically the fastest but ignores the "chunk_size" so you must deal with large tables yourself. + reflection_level: (ReflectionLevel): Specifies how much information should be reflected from the source database schema. + "minimal": Only table names, nullability and primary keys are reflected. Data types are inferred from the data. + "full": Data types will be reflected on top of "minimal". `dlt` will coerce the data into reflected types if necessary. This is the default option. + "full_with_precision": Sets precision and scale on supported data types (ie. decimal, text, binary). Creates big and regular integer types. + detect_precision_hints (bool): Deprecated. Use `reflection_level`. Set column precision and scale hints for supported data types in the target schema based on the columns in the source tables. + This is disabled by default. + defer_table_reflect (bool): Will connect and reflect table schema only when yielding data. Enable this option when running on Airflow. Available + on dlt 0.4.4 and later + table_adapter_callback: (Callable): Receives each reflected table. May be used to modify the list of columns that will be selected. + backend_kwargs (**kwargs): kwargs passed to table backend ie. "conn" is used to pass specialized connection string to connectorx. + type_adapter_callback(Optional[Callable]): Callable to override type inference when reflecting columns. + Argument is a single sqlalchemy data type (`TypeEngine` instance) and it should return another sqlalchemy data type, or `None` (type will be inferred from data) + included_columns (Optional[List[str]): List of column names to select from the table. If not provided, all columns are loaded. + query_adapter_callback(Optional[Callable[Select, Table], Select]): Callable to override the SELECT query used to fetch data from the table. + The callback receives the sqlalchemy `Select` and corresponding `Table` objects and should return the modified `Select`. + + Returns: + DltResource: The dlt resource for loading data from the SQL database table. + """ + _detect_precision_hints_deprecated(detect_precision_hints) + + if detect_precision_hints: + reflection_level = "full_with_precision" + else: + reflection_level = reflection_level or "minimal" + + engine = engine_from_credentials(credentials, may_dispose_after_use=True) + engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size) + metadata = metadata or MetaData(schema=schema) + + table_obj: Table | None = metadata.tables.get("table") + if table_obj is None: + table_obj = Table(table, metadata, autoload_with=None if defer_table_reflect else engine) + + if not defer_table_reflect: + default_table_adapter(table_obj, included_columns) + if table_adapter_callback: + table_adapter_callback(table_obj) + + columns = table_to_columns(table_obj, reflection_level, type_adapter_callback) + + def query_adapter_callback(query: SelectAny, table: Table): + cols_to_select = list(columns.keys()) + + return query.with_only_columns(table.c[*cols_to_select]) + + return dlt.resource( + table_rows, + name=table_obj.name, + primary_key=get_primary_key(table_obj), + merge_key=get_primary_key(table_obj), + columns=columns, + write_disposition={ + "disposition": "merge", + "strategy": "upsert", + } + if incremental + else "replace", + table_format="delta", + )( + engine=engine, + table=table_obj, + chunk_size=chunk_size, + backend=backend, + incremental=incremental, + reflection_level=reflection_level, + defer_table_reflect=defer_table_reflect, + table_adapter_callback=table_adapter_callback, + backend_kwargs=backend_kwargs, + type_adapter_callback=type_adapter_callback, + included_columns=included_columns, + query_adapter_callback=query_adapter_callback, + connect_args=connect_args, + ) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py new file mode 100644 index 0000000000000..8d8e7ad478dac --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/_json.py @@ -0,0 +1,32 @@ +import json +from sqlalchemy import String, TypeDecorator + + +class BigQueryJSON(TypeDecorator): + """ + SQLAlchemy 2.0 compatible JSON type for BigQuery + + This implementation uses STRING as the underlying type since + that's how BigQuery stores JSON. + """ + + impl = String + cache_ok = True + + def __init__(self, none_as_null: bool = False) -> None: + super().__init__() + self.none_as_null = none_as_null + + # Add these for BigQuery dialect compatibility + self._json_serializer = json.dumps + self._json_deserializer = json.loads + + def process_bind_param(self, value, dialect): + if value is None: + return None if self.none_as_null else "null" + return self._json_deserializer(value) + + def process_result_value(self, value, dialect): + if value is None: + return None + return self._json_serializer(value) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py new file mode 100644 index 0000000000000..2510614e76908 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/arrow_helpers.py @@ -0,0 +1,136 @@ +from typing import Any, Optional +from collections.abc import Sequence + +from dlt.common.schema.typing import TTableSchemaColumns +from dlt.common import logger, json +from dlt.common.configuration import with_config +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.json import custom_encode, map_nested_in_place + +from .schema_types import RowAny + + +@with_config +def columns_to_arrow( + columns_schema: TTableSchemaColumns, + caps: Optional[DestinationCapabilitiesContext] = None, + tz: str = "UTC", +) -> Any: + """Converts `column_schema` to arrow schema using `caps` and `tz`. `caps` are injected from the container - which + is always the case if run within the pipeline. This will generate arrow schema compatible with the destination. + Otherwise generic capabilities are used + """ + from dlt.common.libs.pyarrow import pyarrow as pa, get_py_arrow_datatype + from dlt.common.destination.capabilities import DestinationCapabilitiesContext + + return pa.schema( + [ + pa.field( + name, + get_py_arrow_datatype( + schema_item, + caps or DestinationCapabilitiesContext.generic_capabilities(), + tz, + ), + nullable=schema_item.get("nullable", True), + ) + for name, schema_item in columns_schema.items() + if schema_item.get("data_type") is not None + ] + ) + + +def row_tuples_to_arrow(rows: Sequence[RowAny], columns: TTableSchemaColumns, tz: str) -> Any: + """Converts the rows to an arrow table using the columns schema. + Columns missing `data_type` will be inferred from the row data. + Columns with object types not supported by arrow are excluded from the resulting table. + """ + from dlt.common.libs.pyarrow import pyarrow as pa + import numpy as np + + try: + from pandas._libs import lib + + pivoted_rows = lib.to_object_array_tuples(rows).T # type: ignore[attr-defined] + except ImportError: + logger.info("Pandas not installed, reverting to numpy.asarray to create a table which is slower") + pivoted_rows = np.asarray(rows, dtype="object", order="k").T # type: ignore[call-overload] + + columnar = {col: dat.ravel() for col, dat in zip(columns, np.vsplit(pivoted_rows, len(columns)))} + columnar_known_types = { + col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is not None + } + columnar_unknown_types = { + col["name"]: columnar[col["name"]] for col in columns.values() if col.get("data_type") is None + } + + arrow_schema = columns_to_arrow(columns, tz=tz) + + for idx in range(0, len(arrow_schema.names)): + field = arrow_schema.field(idx) + py_type: type = type(None) + for row in rows: + val = row[idx] + if val is not None: + py_type = type(val) + break + + # cast double / float ndarrays to decimals if type mismatch, looks like decimals and floats are often mixed up in dialects + if pa.types.is_decimal(field.type) and issubclass(py_type, str | float): + logger.warning( + f"Field {field.name} was reflected as decimal type, but rows contains {py_type.__name__}. Additional cast is required which may slow down arrow table generation." + ) + float_array = pa.array(columnar_known_types[field.name], type=pa.float64()) + columnar_known_types[field.name] = float_array.cast(field.type, safe=False) + if issubclass(py_type, dict | list): + logger.warning( + f"Field {field.name} was reflected as JSON type and needs to be serialized back to string to be placed in arrow table. This will slow data extraction down. You should cast JSON field to STRING in your database system ie. by creating and extracting an SQL VIEW that selects with cast." + ) + json_str_array = pa.array([None if s is None else json.dumps(s) for s in columnar_known_types[field.name]]) + columnar_known_types[field.name] = json_str_array + + # If there are unknown type columns, first create a table to infer their types + if columnar_unknown_types: + new_schema_fields = [] + for key in list(columnar_unknown_types): + arrow_col: Optional[pa.Array] = None + try: + arrow_col = pa.array(columnar_unknown_types[key]) + if pa.types.is_null(arrow_col.type): + logger.warning( + f"Column {key} contains only NULL values and data type could not be inferred. This column is removed from a arrow table" + ) + continue + + except pa.ArrowInvalid as e: + # Try coercing types not supported by arrow to a json friendly format + # E.g. dataclasses -> dict, UUID -> str + try: + arrow_col = pa.array(map_nested_in_place(custom_encode, list(columnar_unknown_types[key]))) + logger.warning( + f"Column {key} contains a data type which is not supported by pyarrow and got converted into {arrow_col.type}. This slows down arrow table generation." + ) + except (pa.ArrowInvalid, TypeError): + logger.warning( + f"Column {key} contains a data type which is not supported by pyarrow. This column will be ignored. Error: {e}" + ) + if arrow_col is not None: + columnar_known_types[key] = arrow_col + new_schema_fields.append( + pa.field( + key, + arrow_col.type, + nullable=columns[key]["nullable"], + ) + ) + + # New schema + column_order = {name: idx for idx, name in enumerate(columns)} + arrow_schema = pa.schema( + sorted( + list(arrow_schema) + new_schema_fields, + key=lambda x: column_order[x.name], + ) + ) + + return pa.Table.from_pydict(columnar_known_types, schema=arrow_schema) diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py new file mode 100644 index 0000000000000..46f59929beb47 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/helpers.py @@ -0,0 +1,303 @@ +"""SQL database source helpers""" + +import warnings +from typing import ( + Any, + Literal, + Optional, + Union, +) +from collections.abc import Callable, Iterator +import operator + +import dlt +from dlt.common.configuration.specs import BaseConfiguration, configspec +from dlt.common.exceptions import MissingDependencyException +from dlt.common.schema import TTableSchemaColumns +from dlt.common.typing import TDataItem, TSortOrder + +from dlt.sources.credentials import ConnectionStringCredentials + +from .arrow_helpers import row_tuples_to_arrow +from .schema_types import ( + default_table_adapter, + table_to_columns, + get_primary_key, + SelectAny, + ReflectionLevel, + TTypeAdapter, +) + +from sqlalchemy import Table, create_engine, text +from sqlalchemy.engine import Engine +from sqlalchemy.exc import CompileError + + +TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"] +TQueryAdapter = Callable[[SelectAny, Table], SelectAny] + + +class TableLoader: + def __init__( + self, + engine: Engine, + backend: TableBackend, + table: Table, + columns: TTableSchemaColumns, + chunk_size: int = 1000, + incremental: Optional[dlt.sources.incremental[Any]] = None, + query_adapter_callback: Optional[TQueryAdapter] = None, + connect_args: Optional[list[str]] = None, + ) -> None: + self.engine = engine + self.backend = backend + self.table = table + self.columns = columns + self.chunk_size = chunk_size + self.query_adapter_callback = query_adapter_callback + self.incremental = incremental + self.connect_args = connect_args + if incremental: + try: + self.cursor_column = table.c[incremental.cursor_path] + except KeyError as e: + raise KeyError( + f"Cursor column '{incremental.cursor_path}' does not exist in table '{table.name}'" + ) from e + self.last_value = incremental.last_value + self.end_value = incremental.end_value + self.row_order: TSortOrder = self.incremental.row_order + else: + self.cursor_column = None + self.last_value = None + self.end_value = None + self.row_order = None + + def _make_query(self) -> SelectAny: + table = self.table + query = table.select() + if not self.incremental: + return query + last_value_func = self.incremental.last_value_func + + # generate where + if last_value_func is max: # Query ordered and filtered according to last_value function + filter_op = operator.ge + filter_op_end = operator.lt + elif last_value_func is min: + filter_op = operator.le + filter_op_end = operator.gt + else: # Custom last_value, load everything and let incremental handle filtering + return query + + if self.last_value is not None: + query = query.where(filter_op(self.cursor_column, self.last_value)) + if self.end_value is not None: + query = query.where(filter_op_end(self.cursor_column, self.end_value)) + + # generate order by from declared row order + order_by = None + if (self.row_order == "asc" and last_value_func is max) or ( + self.row_order == "desc" and last_value_func is min + ): + order_by = self.cursor_column.asc() + elif (self.row_order == "asc" and last_value_func is min) or ( + self.row_order == "desc" and last_value_func is max + ): + order_by = self.cursor_column.desc() + if order_by is not None: + query = query.order_by(order_by) + + return query + + def make_query(self) -> SelectAny: + if self.query_adapter_callback: + return self.query_adapter_callback(self._make_query(), self.table) + return self._make_query() + + def load_rows(self, backend_kwargs: Optional[dict[str, Any]] = None) -> Iterator[TDataItem]: + # make copy of kwargs + backend_kwargs = dict(backend_kwargs or {}) + query = self.make_query() + if self.backend == "connectorx": + yield from self._load_rows_connectorx(query, backend_kwargs) + else: + yield from self._load_rows(query, backend_kwargs) + + def _load_rows(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> TDataItem: + with self.engine.connect() as conn: + if self.connect_args: + for stmt in self.connect_args: + conn.execute(text(stmt)) + result = conn.execution_options(yield_per=self.chunk_size).execute(query) + # NOTE: cursor returns not normalized column names! may be quite useful in case of Oracle dialect + # that normalizes columns + # columns = [c[0] for c in result.cursor.description] + columns = list(result.keys()) + for partition in result.partitions(size=self.chunk_size): + if self.backend == "sqlalchemy": + yield [dict(row._mapping) for row in partition] + elif self.backend == "pandas": + from dlt.common.libs.pandas_sql import _wrap_result + + df = _wrap_result( + partition, + columns, + **{"dtype_backend": "pyarrow", **(backend_kwargs or {})}, + ) + yield df + elif self.backend == "pyarrow": + yield row_tuples_to_arrow(partition, self.columns, tz=backend_kwargs.get("tz", "UTC")) + + def _load_rows_connectorx(self, query: SelectAny, backend_kwargs: Optional[dict[str, Any]]) -> Iterator[TDataItem]: + try: + import connectorx as cx # type: ignore + except ImportError: + raise MissingDependencyException("Connector X table backend", ["connectorx"]) + + # default settings + backend_kwargs = { + "return_type": "arrow2", + "protocol": "binary", + **(backend_kwargs or {}), + } + conn = backend_kwargs.pop( + "conn", + self.engine.url._replace(drivername=self.engine.url.get_backend_name()).render_as_string( + hide_password=False + ), + ) + try: + query_str = str(query.compile(self.engine, compile_kwargs={"literal_binds": True})) + except CompileError as ex: + raise NotImplementedError( + f"Query for table {self.table.name} could not be compiled to string to execute it on ConnectorX. If you are on SQLAlchemy 1.4.x the causing exception is due to literals that cannot be rendered, upgrade to 2.x: {str(ex)}" + ) from ex + df = cx.read_sql(conn, query_str, **backend_kwargs) + yield df + + +def table_rows( + engine: Engine, + table: Table, + chunk_size: int, + backend: TableBackend, + incremental: Optional[dlt.sources.incremental[Any]] = None, + defer_table_reflect: bool = False, + table_adapter_callback: Optional[Callable[[Table], None]] = None, + reflection_level: ReflectionLevel = "minimal", + backend_kwargs: Optional[dict[str, Any]] = None, + type_adapter_callback: Optional[TTypeAdapter] = None, + included_columns: Optional[list[str]] = None, + query_adapter_callback: Optional[TQueryAdapter] = None, + connect_args: Optional[list[str]] = None, +) -> Iterator[TDataItem]: + columns: TTableSchemaColumns | None = None + if defer_table_reflect: + table = Table(table.name, table.metadata, autoload_with=engine, extend_existing=True) + default_table_adapter(table, included_columns) + if table_adapter_callback: + table_adapter_callback(table) + columns = table_to_columns(table, reflection_level, type_adapter_callback) + + # set the primary_key in the incremental + if incremental and incremental.primary_key is None: + primary_key = get_primary_key(table) + if primary_key is not None: + incremental.primary_key = primary_key + + # yield empty record to set hints + yield dlt.mark.with_hints( + [], + dlt.mark.make_hints( + primary_key=get_primary_key(table), + columns=columns, + ), + ) + else: + # table was already reflected + columns = table_to_columns(table, reflection_level, type_adapter_callback) + + yield dlt.mark.materialize_table_schema() # type: ignore + + loader = TableLoader( + engine, + backend, + table, + columns, + incremental=incremental, + chunk_size=chunk_size, + query_adapter_callback=query_adapter_callback, + connect_args=connect_args, + ) + + yield from loader.load_rows(backend_kwargs) + + engine.dispose() + + +def engine_from_credentials( + credentials: Union[ConnectionStringCredentials, Engine, str], + may_dispose_after_use: bool = False, + **backend_kwargs: Any, +) -> Engine: + if isinstance(credentials, Engine): + return credentials + if isinstance(credentials, ConnectionStringCredentials): + credentials = credentials.to_native_representation() + engine = create_engine(credentials, **backend_kwargs) + setattr(engine, "may_dispose_after_use", may_dispose_after_use) # noqa + return engine + + +def unwrap_json_connector_x(field: str) -> TDataItem: + """Creates a transform function to be added with `add_map` that will unwrap JSON columns + ingested via connectorx. Such columns are additionally quoted and translate SQL NULL to json "null" + """ + import pyarrow.compute as pc + import pyarrow as pa + + def _unwrap(table: TDataItem) -> TDataItem: + col_index = table.column_names.index(field) + # remove quotes + column = pc.replace_substring_regex(table[field], '"(.*)"', "\\1") + # convert json null to null + column = pc.replace_with_mask( + column, + pc.equal(column, "null").combine_chunks(), + pa.scalar(None, pa.large_string()), + ) + return table.set_column(col_index, table.schema.field(col_index), column) + + return _unwrap + + +def _detect_precision_hints_deprecated(value: Optional[bool]) -> None: + if value is None: + return + + msg = "`detect_precision_hints` argument is deprecated and will be removed in a future release. " + if value: + msg += "Use `reflection_level='full_with_precision'` which has the same effect instead." + + warnings.warn(msg, DeprecationWarning, stacklevel=1) + + +@configspec +class SqlDatabaseTableConfiguration(BaseConfiguration): + incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] + included_columns: Optional[list[str]] = None + + +@configspec +class SqlTableResourceConfiguration(BaseConfiguration): + credentials: Optional[Union[ConnectionStringCredentials, Engine, str]] = None + table: Optional[str] = None + schema: Optional[str] = None + incremental: Optional[dlt.sources.incremental] = None # type: ignore[type-arg] + chunk_size: int = 50000 + backend: TableBackend = "sqlalchemy" + detect_precision_hints: Optional[bool] = None + defer_table_reflect: Optional[bool] = False + reflection_level: Optional[ReflectionLevel] = "full" + included_columns: Optional[list[str]] = None diff --git a/posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py b/posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py new file mode 100644 index 0000000000000..cc744eed0c9f6 --- /dev/null +++ b/posthog/temporal/data_imports/pipelines/sql_database_v2/schema_types.py @@ -0,0 +1,170 @@ +from typing import ( + Optional, + Any, + TYPE_CHECKING, + Literal, + Union, +) +from collections.abc import Callable +from typing import TypeAlias +from sqlalchemy import Table, Column +from sqlalchemy.engine import Row +from sqlalchemy.dialects.postgresql.types import INTERVAL +from sqlalchemy.types import ARRAY +from sqlalchemy.sql import sqltypes, Select +from sqlalchemy.sql.sqltypes import TypeEngine + +from dlt.common import logger +from dlt.common.schema.typing import TColumnSchema, TTableSchemaColumns + +from posthog.temporal.data_imports.pipelines.sql_database_v2._json import BigQueryJSON +from sqlalchemy_bigquery import STRUCT as BigQueryStruct + +ReflectionLevel = Literal["minimal", "full", "full_with_precision"] + + +# optionally create generics with any so they can be imported by dlt importer +if TYPE_CHECKING: + SelectAny: TypeAlias = Select[Any] + ColumnAny: TypeAlias = Column[Any] + RowAny: TypeAlias = Row[Any] + TypeEngineAny = TypeEngine[Any] +else: + SelectAny: TypeAlias = type[Any] + ColumnAny: TypeAlias = type[Any] + RowAny: TypeAlias = type[Any] + TypeEngineAny = type[Any] + + +TTypeAdapter = Callable[[TypeEngineAny], Optional[Union[TypeEngineAny, type[TypeEngineAny]]]] + + +def default_table_adapter(table: Table, included_columns: Optional[list[str]]) -> None: + """Default table adapter being always called before custom one""" + if included_columns is not None: + # Delete columns not included in the load + for col in list(table._columns): + if col.name not in included_columns: + table._columns.remove(col) + for col in table._columns: + sql_t = col.type + if isinstance(sql_t, sqltypes.Uuid): + # emit uuids as string by default + sql_t.as_uuid = False + + +def sqla_col_to_column_schema( + sql_col: ColumnAny, + reflection_level: ReflectionLevel, + type_adapter_callback: Optional[TTypeAdapter] = None, +) -> Optional[TColumnSchema]: + """Infer dlt schema column type from an sqlalchemy type. + + If `add_precision` is set, precision and scale is inferred from that types that support it, + such as numeric, varchar, int, bigint. Numeric (decimal) types have always precision added. + """ + col: TColumnSchema = { + "name": sql_col.name, + "nullable": sql_col.nullable, + } + if reflection_level == "minimal": + return col + + sql_t = sql_col.type + + if type_adapter_callback: + sql_t = type_adapter_callback(sql_t) # type: ignore[assignment] + # Check if sqla type class rather than instance is returned + if sql_t is not None and isinstance(sql_t, type): + sql_t = sql_t() + + if sql_t is None: + # Column ignored by callback + return col + + add_precision = reflection_level == "full_with_precision" + + if isinstance(sql_t, sqltypes.Uuid): + # we represent UUID as text by default, see default_table_adapter + col["data_type"] = "text" + elif isinstance(sql_t, sqltypes.Numeric): + # check for Numeric type first and integer later, some numeric types (ie. Oracle) + # derive from both + # all Numeric types that are returned as floats will assume "double" type + # and returned as decimals will assume "decimal" type + if sql_t.asdecimal is False: + col["data_type"] = "double" + else: + col["data_type"] = "decimal" + if sql_t.precision is not None: + col["precision"] = sql_t.precision + # must have a precision for any meaningful scale + if sql_t.scale is not None: + col["scale"] = sql_t.scale + elif sql_t.decimal_return_scale is not None: + col["scale"] = sql_t.decimal_return_scale + elif isinstance(sql_t, sqltypes.SmallInteger): + col["data_type"] = "bigint" + if add_precision: + col["precision"] = 32 + elif isinstance(sql_t, sqltypes.Integer): + col["data_type"] = "bigint" + elif isinstance(sql_t, sqltypes.String): + col["data_type"] = "text" + if add_precision and sql_t.length: + col["precision"] = sql_t.length + elif isinstance(sql_t, sqltypes._Binary): + col["data_type"] = "binary" + if add_precision and sql_t.length: + col["precision"] = sql_t.length + elif isinstance(sql_t, sqltypes.DateTime): + col["data_type"] = "timestamp" + elif isinstance(sql_t, sqltypes.Date): + col["data_type"] = "date" + elif isinstance(sql_t, sqltypes.Time): + col["data_type"] = "time" + elif isinstance(sql_t, sqltypes.JSON) or isinstance(sql_t, BigQueryJSON) or isinstance(sql_t, BigQueryStruct): + col["data_type"] = "json" + elif isinstance(sql_t, sqltypes.Boolean): + col["data_type"] = "bool" + elif isinstance(sql_t, sqltypes.Interval) or isinstance(sql_t, INTERVAL) or isinstance(sql_t, ARRAY): + # No support for interval columns yet - we filter out binary columns, so reusing the DLT type for interval columns to be removed + col["data_type"] = "interval" # type: ignore + else: + logger.warning( + f"A column with name {sql_col.name} contains unknown data type {sql_t} which cannot be mapped to `dlt` data type. When using sqlalchemy backend such data will be passed to the normalizer. In case of `pyarrow` and `pandas` backend, data types are detected from numpy ndarrays. In case of other backends, the behavior is backend-specific." + ) + + return {key: value for key, value in col.items() if value is not None} # type: ignore[return-value] + + +def get_primary_key(table: Table) -> Optional[list[str]]: + """Create primary key or return None if no key defined""" + primary_key = [c.name for c in table.primary_key] + if len(primary_key) > 0: + return primary_key + + column_names = [c.name for c in table.columns] + if "id" in column_names: + return ["id"] + + return None + + +def table_to_columns( + table: Table, + reflection_level: ReflectionLevel = "full", + type_conversion_fallback: Optional[TTypeAdapter] = None, +) -> TTableSchemaColumns: + """Convert an sqlalchemy table to a dlt table schema.""" + cols = { + col["name"]: col + for col in (sqla_col_to_column_schema(c, reflection_level, type_conversion_fallback) for c in table.columns) + if col is not None and "name" in col and col["name"] is not None + } + + for name, col in list(cols.items()): + if "data_type" in col and (col["data_type"] == "binary" or col["data_type"] == "interval"): + cols.pop(name) + + return cols diff --git a/posthog/temporal/data_imports/workflow_activities/import_data.py b/posthog/temporal/data_imports/workflow_activities/import_data.py index 9691fb4b02f4a..9af5e2d2b6db3 100644 --- a/posthog/temporal/data_imports/workflow_activities/import_data.py +++ b/posthog/temporal/data_imports/workflow_activities/import_data.py @@ -5,6 +5,7 @@ from temporalio import activity +from posthog.settings.utils import get_from_env from posthog.temporal.common.heartbeat import Heartbeater from posthog.temporal.data_imports.pipelines.bigquery import delete_table from posthog.temporal.data_imports.pipelines.helpers import aremove_reset_pipeline, aupdate_job_count @@ -29,6 +30,11 @@ class ImportDataActivityInputs: run_id: str +def is_posthog_team(team_id: int) -> bool: + region = get_from_env("CLOUD_DEPLOYMENT", optional=True) + return (region == "EU" and team_id == 1) or (region == "US" and team_id == 2) + + @activity.defn async def import_data_activity(inputs: ImportDataActivityInputs): async with Heartbeater(factor=30): # Every 10 secs @@ -110,7 +116,10 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ExternalDataSource.Type.MYSQL, ExternalDataSource.Type.MSSQL, ]: - from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type + if is_posthog_team(inputs.team_id): + from posthog.temporal.data_imports.pipelines.sql_database_v2 import sql_source_for_type + else: + from posthog.temporal.data_imports.pipelines.sql_database import sql_source_for_type host = model.pipeline.job_inputs.get("host") port = model.pipeline.job_inputs.get("port") @@ -145,7 +154,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs): raise Exception("Can't open tunnel to SSH server") source = sql_source_for_type( - source_type=model.pipeline.source_type, + source_type=ExternalDataSource.Type(model.pipeline.source_type), host=tunnel.local_bind_host, port=tunnel.local_bind_port, user=user, @@ -173,7 +182,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs): ) source = sql_source_for_type( - source_type=model.pipeline.source_type, + source_type=ExternalDataSource.Type(model.pipeline.source_type), host=host, port=port, user=user, @@ -316,7 +325,7 @@ async def import_data_activity(inputs: ImportDataActivityInputs): reset_pipeline=reset_pipeline, ) elif model.pipeline.source_type == ExternalDataSource.Type.BIGQUERY: - from posthog.temporal.data_imports.pipelines.sql_database import bigquery_source + from posthog.temporal.data_imports.pipelines.sql_database_v2 import bigquery_source dataset_id = model.pipeline.job_inputs.get("dataset_id") project_id = model.pipeline.job_inputs.get("project_id") diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index c4c483a5f1fd9..17ed1c53190c6 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -50,7 +50,7 @@ def dlt_to_hogql_type(dlt_type: TDataType | None) -> str: hogql_type = IntegerDatabaseField elif dlt_type == "binary": raise Exception("DLT type 'binary' is not a supported column type") - elif dlt_type == "complex": + elif dlt_type == "json": hogql_type = StringJSONDatabaseField elif dlt_type == "decimal": hogql_type = IntegerDatabaseField diff --git a/posthog/warehouse/models/external_table_definitions.py b/posthog/warehouse/models/external_table_definitions.py index 9d76d4d993969..00704ec6c3994 100644 --- a/posthog/warehouse/models/external_table_definitions.py +++ b/posthog/warehouse/models/external_table_definitions.py @@ -707,8 +707,8 @@ IntegerDatabaseField: "bigint", BooleanDatabaseField: "bool", DateTimeDatabaseField: "timestamp", - StringJSONDatabaseField: "complex", - StringArrayDatabaseField: "complex", + StringJSONDatabaseField: "json", + StringArrayDatabaseField: "json", FloatDatabaseField: "double", DateDatabaseField: "date", } diff --git a/requirements.in b/requirements.in index 619c6a06cdaaf..b5752d1267274 100644 --- a/requirements.in +++ b/requirements.in @@ -32,8 +32,8 @@ django-revproxy==0.12.0 djangorestframework==3.15.1 djangorestframework-csv==2.1.1 djangorestframework-dataclasses==1.2.0 -dlt==0.5.4 -dlt[deltalake]==0.5.4 +dlt==1.3.0 +dlt[deltalake]==1.3.0 dnspython==2.2.1 drf-exceptions-hog==0.4.0 drf-extensions==0.7.0 diff --git a/requirements.txt b/requirements.txt index 82798318498ef..8f5bc5d3c4275 100644 --- a/requirements.txt +++ b/requirements.txt @@ -206,7 +206,7 @@ djangorestframework-csv==2.1.1 # via -r requirements.in djangorestframework-dataclasses==1.2.0 # via -r requirements.in -dlt==0.5.4 +dlt==1.3.0 # via -r requirements.in dnspython==2.2.1 # via -r requirements.in @@ -452,6 +452,8 @@ platformdirs==3.11.0 # via # snowflake-connector-python # zeep +pluggy==1.5.0 + # via dlt ply==3.11 # via jsonpath-ng posthoganalytics==3.6.6