diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index dbf02e2eece48..606c2b89303b7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -276,6 +276,10 @@ *path_spec_common, } +threading_timeout_common = { + "stopit==1.1.2", +} + abs_base = { "azure-core==1.29.4", "azure-identity>=1.17.1", @@ -493,9 +497,12 @@ "starburst-trino-usage": sql_common | usage_common | trino, "nifi": {"requests", "packaging", "requests-gssapi"}, "powerbi": ( - microsoft_common - | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} - | sqlglot_lib + ( + microsoft_common + | {"lark[regex]==1.1.4", "sqlparse", "more-itertools"} + | sqlglot_lib + | threading_timeout_common + ) ), "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py index daf0aa5a4667d..086ce2c263b0c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/parser.py @@ -1,6 +1,7 @@ import functools import importlib.resources as pkg_resource import logging +import os from typing import Dict, List import lark @@ -19,9 +20,12 @@ TRACE_POWERBI_MQUERY_PARSER, ) from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout logger = logging.getLogger(__name__) +_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60)) + @functools.lru_cache(maxsize=1) def get_lark_parser() -> Lark: @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree: expression = expression.replace("\u00a0", " ") logger.debug(f"Parsing expression = {expression}") - parse_tree: Tree = lark_parser.parse(expression) + with threading_timeout(_M_QUERY_PARSE_TIMEOUT): + parse_tree: Tree = lark_parser.parse(expression) if TRACE_POWERBI_MQUERY_PARSER: logger.debug(parse_tree.pretty()) @@ -85,6 +90,13 @@ def get_upstream_tables( return [] except KeyboardInterrupt: raise + except TimeoutException: + reporter.warning( + title="M-Query Parsing Timeout", + message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.", + context=f"table-full-name={table.full_name}, expression={table.expression}", + ) + return [] except ( BaseException ) as e: # TODO: Debug why BaseException is needed here and below. diff --git a/metadata-ingestion/src/datahub/utilities/threading_timeout.py b/metadata-ingestion/src/datahub/utilities/threading_timeout.py new file mode 100644 index 0000000000000..e2caf57ad2116 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/threading_timeout.py @@ -0,0 +1,42 @@ +import contextlib +import functools +import platform +from typing import ContextManager + +from stopit import ThreadingTimeout as _ThreadingTimeout, TimeoutException + +__all__ = ["threading_timeout", "TimeoutException"] + + +@functools.lru_cache(maxsize=1) +def _is_cpython() -> bool: + """Check if we're running on CPython.""" + return platform.python_implementation() == "CPython" + + +def threading_timeout(timeout: float) -> ContextManager[None]: + """A timeout context manager that uses stopit's ThreadingTimeout underneath. + + This is only supported on CPython. + That's because stopit.ThreadingTimeout uses a CPython-internal method to raise + an exception (the timeout error) in another thread. See stopit.threadstop.async_raise. + + Reference: https://github.com/glenfant/stopit + + Args: + timeout: The timeout in seconds. If <= 0, no timeout is applied. + + Raises: + RuntimeError: If the timeout is not supported on the current Python implementation. + TimeoutException: If the timeout is exceeded. + """ + + if timeout <= 0: + return contextlib.nullcontext() + + if not _is_cpython(): + raise RuntimeError( + f"Timeout is only supported on CPython, not {platform.python_implementation()}" + ) + + return _ThreadingTimeout(timeout, swallow_exc=False) diff --git a/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py new file mode 100644 index 0000000000000..c52d18bdd55c2 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_threading_timeout.py @@ -0,0 +1,31 @@ +import time + +import pytest + +from datahub.utilities.threading_timeout import TimeoutException, threading_timeout + + +def test_timeout_no_timeout(): + # Should complete without raising an exception + with threading_timeout(1.0): + time.sleep(0.1) + + +def test_timeout_raises(): + # Should raise TimeoutException + with pytest.raises(TimeoutException): + with threading_timeout(0.1): + time.sleep(0.5) + + +def test_timeout_early_exit(): + # Test that context manager handles other exceptions properly + with pytest.raises(ValueError): + with threading_timeout(1.0): + raise ValueError("Early exit") + + +def test_timeout_zero(): + # Should not raise an exception + with threading_timeout(0.0): + pass