From 8f84eb1140cbedeca9945c7e8c662e858397fb8b Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 29 Oct 2024 21:53:09 -0700 Subject: [PATCH 1/3] feat(ingest/powerbi): add additional reporting for workspace filters (#11711) fix some logging improve error handling --- metadata-ingestion/setup.py | 6 +- .../source/bigquery_v2/bigquery_schema.py | 4 +- .../ingestion/source/powerbi/config.py | 19 ++++--- .../source/powerbi/m_query/parser.py | 10 ++-- .../source/powerbi/m_query/resolver.py | 5 +- .../ingestion/source/powerbi/powerbi.py | 56 ++++++++++++------- .../powerbi/rest_api_wrapper/data_classes.py | 3 + .../powerbi/rest_api_wrapper/data_resolver.py | 6 +- .../powerbi/rest_api_wrapper/powerbi_api.py | 34 +++++++---- .../powerbi/golden_test_container.json | 10 +++- ..._config_and_modified_since_admin_only.json | 5 +- .../powerbi/golden_test_personal_ingest.json | 5 +- 12 files changed, 108 insertions(+), 55 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index aa6dcaeeff039..0062060a95d0d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -488,7 +488,11 @@ "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, - "powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib, + "powerbi": ( + microsoft_common + | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} + | sqlglot_lib + ), "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"}, "unity-catalog": databricks | sql_common | sqllineage_lib, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py index 4f18c22c108a6..58317b108bef4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema.py @@ -175,7 +175,7 @@ def __init__( def get_query_result(self, query: str) -> RowIterator: def _should_retry(exc: BaseException) -> bool: - logger.debug(f"Exception occured for job query. Reason: {exc}") + logger.debug(f"Exception occurred for job query. Reason: {exc}") # Jobs sometimes fail with transient errors. # This is not currently handled by the python-bigquery client. # https://github.com/googleapis/python-bigquery/issues/23 @@ -197,7 +197,7 @@ def _should_retry(exc: BaseException) -> bool: def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]: def _should_retry(exc: BaseException) -> bool: logger.debug( - f"Exception occured for project.list api. Reason: {exc}. Retrying api request..." + f"Exception occurred for project.list api. Reason: {exc}. Retrying api request..." ) self.report.num_list_projects_retry_request += 1 return True diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py index 8a3f8ed6131a2..0aec9a589cf27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py @@ -19,6 +19,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) +from datahub.utilities.lossy_collections import LossyList logger = logging.getLogger(__name__) @@ -176,11 +177,18 @@ class SupportedDataPlatform(Enum): @dataclass class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport): + all_workspace_count: int = 0 + filtered_workspace_names: LossyList[str] = dataclass_field( + default_factory=LossyList + ) + filtered_workspace_types: LossyList[str] = dataclass_field( + default_factory=LossyList + ) + dashboards_scanned: int = 0 charts_scanned: int = 0 filtered_dashboards: List[str] = dataclass_field(default_factory=list) filtered_charts: List[str] = dataclass_field(default_factory=list) - number_of_workspaces: int = 0 def report_dashboards_scanned(self, count: int = 1) -> None: self.dashboards_scanned += count @@ -194,9 +202,6 @@ def report_dashboards_dropped(self, model: str) -> None: def report_charts_dropped(self, view: str) -> None: self.filtered_charts.append(view) - def report_number_of_workspaces(self, number_of_workspaces: int) -> None: - self.number_of_workspaces = number_of_workspaces - def default_for_dataset_type_mapping() -> Dict[str, str]: dict_: dict = {} @@ -331,7 +336,7 @@ class PowerBiDashboardSourceConfig( ) workspace_id_as_urn_part: bool = pydantic.Field( default=False, - description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names." + description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names. " "To maintain backward compatibility, this is set to False which uses workspace name", ) # Enable/Disable extracting ownership information of Dashboard @@ -371,8 +376,8 @@ class PowerBiDashboardSourceConfig( # any existing tags defined to those entities extract_endorsements_to_tags: bool = pydantic.Field( default=False, - description="Whether to extract endorsements to tags, note that this may overwrite existing tags. Admin API " - "access is required is this setting is enabled", + description="Whether to extract endorsements to tags, note that this may overwrite existing tags. " + "Admin API access is required if this setting is enabled.", ) filter_dataset_endorsements: AllowDenyPattern = pydantic.Field( default=AllowDenyPattern.allow_all(), diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index 3edaaed2ff814..daf0aa5a4667d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -83,17 +83,19 @@ def get_upstream_tables( context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}", ) return [] + except KeyboardInterrupt: + raise except ( BaseException ) as e: # TODO: Debug why BaseException is needed here and below. if isinstance(e, lark.exceptions.UnexpectedCharacters): - title = "Unexpected Character Found" + error_type = "Unexpected Character Error" else: - title = "Unknown Parsing Error" + error_type = "Unknown Parsing Error" reporter.warning( - title=title, - message="Unknown parsing error", + title="Unable to extract lineage from M-Query expression", + message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.", context=f"table-full-name={table.full_name}, expression={table.expression}", exc=e, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py index 20fb0b5facbbc..a5fb6fd2673ac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py @@ -473,8 +473,9 @@ def internal( ) if v_statement is None: self.reporter.report_warning( - f"{self.table.full_name}-variable-statement", - f"output variable ({current_identifier}) statement not found in table expression", + title="Unable to extract lineage from M-Query expression", + message="Lineage will be incomplete.", + context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression", ) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 72336afbaacd0..b2afdc3e40931 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -7,6 +7,8 @@ from datetime import datetime from typing import Iterable, List, Optional, Tuple, Union +import more_itertools + import datahub.emitter.mce_builder as builder import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes from datahub.emitter.mcp import MetadataChangeProposalWrapper @@ -795,6 +797,11 @@ def generate_container_for_workspace( container_key=self.workspace_key, name=workspace.name, sub_types=[workspace.type], + extra_properties={ + "workspace_id": workspace.id, + "workspace_name": workspace.name, + "workspace_type": workspace.type, + }, ) return container_work_units @@ -1256,20 +1263,33 @@ def create(cls, config_dict, ctx): def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]: all_workspaces = self.powerbi_client.get_workspaces() + logger.info(f"Number of workspaces = {len(all_workspaces)}") + self.reporter.all_workspace_count = len(all_workspaces) + logger.debug( + f"All workspaces: {[workspace.format_name_for_logger() for workspace in all_workspaces]}" + ) - allowed_wrk = [ - workspace - for workspace in all_workspaces - if self.source_config.workspace_id_pattern.allowed(workspace.id) - and workspace.type in self.source_config.workspace_type_filter - ] + allowed_workspaces = [] + for workspace in all_workspaces: + if not self.source_config.workspace_id_pattern.allowed(workspace.id): + self.reporter.filtered_workspace_names.append( + f"{workspace.id} - {workspace.name}" + ) + continue + elif workspace.type not in self.source_config.workspace_type_filter: + self.reporter.filtered_workspace_types.append( + f"{workspace.id} - {workspace.name} (type = {workspace.type})" + ) + continue + else: + allowed_workspaces.append(workspace) - logger.info(f"Number of workspaces = {len(all_workspaces)}") - self.reporter.report_number_of_workspaces(len(all_workspaces)) - logger.info(f"Number of allowed workspaces = {len(allowed_wrk)}") - logger.debug(f"Workspaces = {all_workspaces}") + logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}") + logger.debug( + f"Allowed workspaces: {[workspace.format_name_for_logger() for workspace in allowed_workspaces]}" + ) - return allowed_wrk + return allowed_workspaces def validate_dataset_type_mapping(self): powerbi_data_platforms: List[str] = [ @@ -1480,16 +1500,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: # Fetch PowerBi workspace for given workspace identifier allowed_workspaces = self.get_allowed_workspaces() - workspaces_len = len(allowed_workspaces) - - batch_size = ( - self.source_config.scan_batch_size - ) # 100 is the maximum allowed for powerbi scan - num_batches = (workspaces_len + batch_size - 1) // batch_size - batches = [ - allowed_workspaces[i * batch_size : (i + 1) * batch_size] - for i in range(num_batches) - ] + + batches = more_itertools.chunked( + allowed_workspaces, self.source_config.scan_batch_size + ) for batch_workspaces in batches: for workspace in self.powerbi_client.fill_workspaces( batch_workspaces, self.reporter diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py index 9407ef7a51b58..fc5cd76458a51 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_classes.py @@ -96,6 +96,9 @@ def get_workspace_key( instance=platform_instance, ) + def format_name_for_logger(self) -> str: + return f"{self.name} ({self.id})" + @dataclass class DataSource: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py index f8fff2391d10b..7a47c40976bec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/data_resolver.py @@ -811,7 +811,7 @@ def _is_scan_result_ready( res.raise_for_status() if res.json()[Constant.STATUS].upper() == Constant.SUCCEEDED: - logger.info(f"Scan result is available for scan id({scan_id})") + logger.debug(f"Scan result is available for scan id({scan_id})") return True if retry == max_retry: @@ -898,8 +898,8 @@ def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User return users def get_scan_result(self, scan_id: str) -> Optional[dict]: - logger.info("Fetching scan result") - logger.info(f"{Constant.SCAN_ID}={scan_id}") + logger.debug("Fetching scan result") + logger.debug(f"{Constant.SCAN_ID}={scan_id}") scan_result_get_endpoint = AdminAPIResolver.API_ENDPOINTS[ Constant.SCAN_RESULT_GET ] diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py index b67f257d9eb5b..e137f175c15ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/rest_api_wrapper/powerbi_api.py @@ -303,7 +303,7 @@ def _get_scan_result(self, workspace_ids: List[str]) -> Any: ) return None - logger.info("Waiting for scan to complete") + logger.debug("Waiting for scan to complete") if ( self.__admin_api_resolver.wait_for_scan_to_complete( scan_id=scan_id, timeout=self.__config.scan_timeout @@ -355,22 +355,32 @@ def _get_workspace_datasets(self, workspace: Workspace) -> dict: logger.debug("Processing scan result for datasets") for dataset_dict in datasets: - dataset_instance: PowerBIDataset = self._get_resolver().get_dataset( - workspace=workspace, - dataset_id=dataset_dict[Constant.ID], - ) + dataset_id = dataset_dict[Constant.ID] + try: + dataset_instance = self._get_resolver().get_dataset( + workspace=workspace, + dataset_id=dataset_id, + ) + if dataset_instance is None: + continue + except Exception as e: + self.reporter.warning( + title="Unable to fetch dataset details", + message="Skipping this dataset due to the error. Metadata will be incomplete.", + context=f"workspace={workspace.name}, dataset-id={dataset_id}", + exc=e, + ) + continue # fetch + set dataset parameters try: dataset_parameters = self._get_resolver().get_dataset_parameters( workspace_id=workspace.id, - dataset_id=dataset_dict[Constant.ID], + dataset_id=dataset_id, ) dataset_instance.parameters = dataset_parameters except Exception as e: - logger.info( - f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}" - ) + logger.info(f"Unable to fetch dataset parameters for {dataset_id}: {e}") if self.__config.extract_endorsements_to_tags: dataset_instance.tags = self._parse_endorsement( @@ -564,8 +574,7 @@ def _fill_metadata_from_scan_result( ) else: logger.info( - "Skipping endorsements tag as extract_endorsements_to_tags is set to " - "false " + "Skipping endorsements tag as extract_endorsements_to_tags is not enabled" ) self._populate_app_details( @@ -641,6 +650,9 @@ def fill_dashboard_tags() -> None: def fill_workspaces( self, workspaces: List[Workspace], reporter: PowerBiDashboardSourceReport ) -> Iterable[Workspace]: + logger.info( + f"Fetching initial metadata for workspaces: {[workspace.format_name_for_logger() for workspace in workspaces]}" + ) workspaces = self._fill_metadata_from_scan_result(workspaces=workspaces) # First try to fill the admin detail as some regular metadata contains lineage to admin metadata diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_container.json b/metadata-ingestion/tests/integration/powerbi/golden_test_container.json index e8be3aa9c0ac7..1039240942a5e 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_container.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_container.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "demo-workspace" + "workspace": "demo-workspace", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_name": "demo-workspace", + "workspace_type": "Workspace" }, "name": "demo-workspace" } @@ -3957,7 +3960,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "second-demo-workspace" + "workspace": "second-demo-workspace", + "workspace_id": "64ED5CAD-7C22-4684-8180-826122881108", + "workspace_name": "second-demo-workspace", + "workspace_type": "Workspace" }, "name": "second-demo-workspace" } diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json b/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json index e134d795af9ef..0d3a0c0cc6f97 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_most_config_and_modified_since_admin_only.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "64ED5CAD-7C10-4684-8180-826122881108" + "workspace": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "workspace_name": "demo-workspace", + "workspace_type": "Workspace" }, "name": "demo-workspace" } diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json b/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json index f8c0fdc17c880..c605f939235cd 100644 --- a/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_personal_ingest.json @@ -8,7 +8,10 @@ "json": { "customProperties": { "platform": "powerbi", - "workspace": "Jane Smith Workspace" + "workspace": "Jane Smith Workspace", + "workspace_id": "90E9E256-3D6D-4D38-86C8-6CCCBD8C170C", + "workspace_name": "Jane Smith Workspace", + "workspace_type": "PersonalGroup" }, "name": "Jane Smith Workspace" } From 51467d4cbc4c79a2b9165ac8d0b2e62fb76d92fb Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 30 Oct 2024 10:13:07 -0700 Subject: [PATCH 2/3] fix(ingest): pin teradata dep --- metadata-ingestion/setup.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 0062060a95d0d..dbf02e2eece48 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -484,7 +484,11 @@ "teradata": sql_common | usage_common | sqlglot_lib - | {"teradatasqlalchemy>=17.20.0.0"}, + | { + # On 2024-10-30, teradatasqlalchemy 20.0.0.2 was released. This version seemed to cause issues + # in our CI, so we're pinning the version for now. + "teradatasqlalchemy>=17.20.0.0,<=20.0.0.2", + }, "trino": sql_common | trino, "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, From f9133d79222408116e51c7f690fd5263dd03e626 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 29 Oct 2024 21:48:02 -0700 Subject: [PATCH 3/3] feat(ingest/powerbi): add timeouts for m-query parsing --- metadata-ingestion/setup.py | 13 ++++-- .../source/powerbi/m_query/parser.py | 14 ++++++- .../datahub/utilities/threading_timeout.py | 42 +++++++++++++++++++ .../unit/utilities/test_threading_timeout.py | 31 ++++++++++++++ 4 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/src/datahub/utilities/threading_timeout.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_threading_timeout.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index dbf02e2eece48..606c2b89303b7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -276,6 +276,10 @@ *path_spec_common, } +threading_timeout_common = { + "stopit==1.1.2", +} + abs_base = { "azure-core==1.29.4", "azure-identity>=1.17.1", @@ -493,9 +497,12 @@ "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, "powerbi": ( - microsoft_common - | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} - | sqlglot_lib + ( + microsoft_common + | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} + | sqlglot_lib + | threading_timeout_common + ) ), "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index daf0aa5a4667d..086ce2c263b0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -1,6 +1,7 @@ import functools import importlib.resources as pkg_resource import logging +import os from typing import Dict, List import lark @@ -19,9 +20,12 @@ TRACE_POWERBI_MQUERY_PARSER, ) from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout logger = logging.getLogger(__name__) +_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60)) + @functools.lru_cache(maxsize=1) def get_lark_parser() -> Lark: @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree: expression = expression.replace("\u00a0", " ") logger.debug(f"Parsing expression = {expression}") - parse_tree: Tree = lark_parser.parse(expression) + with threading_timeout(_M_QUERY_PARSE_TIMEOUT): + parse_tree: Tree = lark_parser.parse(expression) if TRACE_POWERBI_MQUERY_PARSER: logger.debug(parse_tree.pretty()) @@ -85,6 +90,13 @@ def get_upstream_tables( return [] except KeyboardInterrupt: raise + except TimeoutException: + reporter.warning( + title="M-Query Parsing Timeout", + message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", + context=f"table-full-name={table.full_name}, expression={table.expression}", + ) + return [] except ( BaseException ) as e: # TODO: Debug why BaseException is needed here and below. diff --git a/metadata-ingestion/src/datahub/utilities/threading_timeout.py b/metadata-ingestion/src/datahub/utilities/threading_timeout.py new file mode 100644 index 0000000000000..e2caf57ad2116 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/threading_timeout.py @@ -0,0 +1,42 @@ +import contextlib +import functools +import platform +from typing import ContextManager + +from stopit import ThreadingTimeout as _ThreadingTimeout, TimeoutException + +__all__ = ["threading_timeout", "TimeoutException"] + + +@functools.lru_cache(maxsize=1) +def _is_cpython() -> bool: + """Check if we're running on CPython.""" + return platform.python_implementation() == "CPython" + + +def threading_timeout(timeout: float) -> ContextManager[None]: + """A timeout context manager that uses stopit's ThreadingTimeout underneath. + + This is only supported on CPython. + That's because stopit.ThreadingTimeout uses a CPython-internal method to raise + an exception (the timeout error) in another thread. See stopit.threadstop.async_raise. + + Reference: https://github.com/glenfant/stopit + + Args: + timeout: The timeout in seconds. If <= 0, no timeout is applied. + + Raises: + RuntimeError: If the timeout is not supported on the current Python implementation. + TimeoutException: If the timeout is exceeded. + """ + + if timeout <= 0: + return contextlib.nullcontext() + + if not _is_cpython(): + raise RuntimeError( + f"Timeout is only supported on CPython, not {platform.python_implementation()}" + ) + + return _ThreadingTimeout(timeout, swallow_exc=False) diff --git a/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py new file mode 100644 index 0000000000000..c52d18bdd55c2 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py @@ -0,0 +1,31 @@ +import time + +import pytest + +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout + + +def test_timeout_no_timeout(): + # Should complete without raising an exception + with threading_timeout(1.0): + time.sleep(0.1) + + +def test_timeout_raises(): + # Should raise TimeoutException + with pytest.raises(TimeoutException): + with threading_timeout(0.1): + time.sleep(0.5) + + +def test_timeout_early_exit(): + # Test that context manager handles other exceptions properly + with pytest.raises(ValueError): + with threading_timeout(1.0): + raise ValueError("Early exit") + + +def test_timeout_zero(): + # Should not raise an exception + with threading_timeout(0.0): + pass