Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/powerbi): add timeouts for m-query parsing #11753

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@
*path_spec_common,
}

threading_timeout_common = {
"stopit==1.1.2",
}

abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.17.1",
Expand Down Expand Up @@ -484,11 +488,22 @@
"teradata": sql_common
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
| {
# On 2024-10-30, teradatasqlalchemy 20.0.0.2 was released. This version seemed to cause issues
# in our CI, so we're pinning the version for now.
"teradatasqlalchemy>=17.20.0.0,<=20.0.0.2",
},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi": (
(
microsoft_common
| {"lark[regex]==1.1.4", "sqlparse", "more-itertools"}
| sqlglot_lib
| threading_timeout_common
)
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def __init__(

def get_query_result(self, query: str) -> RowIterator:
def _should_retry(exc: BaseException) -> bool:
logger.debug(f"Exception occured for job query. Reason: {exc}")
logger.debug(f"Exception occurred for job query. Reason: {exc}")
# Jobs sometimes fail with transient errors.
# This is not currently handled by the python-bigquery client.
# https://github.com/googleapis/python-bigquery/issues/23
Expand All @@ -197,7 +197,7 @@ def _should_retry(exc: BaseException) -> bool:
def get_projects(self, max_results_per_page: int = 100) -> List[BigqueryProject]:
def _should_retry(exc: BaseException) -> bool:
logger.debug(
f"Exception occured for project.list api. Reason: {exc}. Retrying api request..."
f"Exception occurred for project.list api. Reason: {exc}. Retrying api request..."
)
self.report.num_list_projects_retry_request += 1
return True
Expand Down
19 changes: 12 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -176,11 +177,18 @@ class SupportedDataPlatform(Enum):

@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
all_workspace_count: int = 0
filtered_workspace_names: LossyList[str] = dataclass_field(
default_factory=LossyList
)
filtered_workspace_types: LossyList[str] = dataclass_field(
default_factory=LossyList
)

dashboards_scanned: int = 0
charts_scanned: int = 0
filtered_dashboards: List[str] = dataclass_field(default_factory=list)
filtered_charts: List[str] = dataclass_field(default_factory=list)
number_of_workspaces: int = 0

def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count
Expand All @@ -194,9 +202,6 @@ def report_dashboards_dropped(self, model: str) -> None:
def report_charts_dropped(self, view: str) -> None:
self.filtered_charts.append(view)

def report_number_of_workspaces(self, number_of_workspaces: int) -> None:
self.number_of_workspaces = number_of_workspaces


def default_for_dataset_type_mapping() -> Dict[str, str]:
dict_: dict = {}
Expand Down Expand Up @@ -331,7 +336,7 @@ class PowerBiDashboardSourceConfig(
)
workspace_id_as_urn_part: bool = pydantic.Field(
default=False,
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names."
description="It is recommended to set this to True only if you have legacy workspaces based on Office 365 groups, as those workspaces can have identical names. "
"To maintain backward compatibility, this is set to False which uses workspace name",
)
# Enable/Disable extracting ownership information of Dashboard
Expand Down Expand Up @@ -371,8 +376,8 @@ class PowerBiDashboardSourceConfig(
# any existing tags defined to those entities
extract_endorsements_to_tags: bool = pydantic.Field(
default=False,
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. Admin API "
"access is required is this setting is enabled",
description="Whether to extract endorsements to tags, note that this may overwrite existing tags. "
"Admin API access is required if this setting is enabled.",
)
filter_dataset_endorsements: AllowDenyPattern = pydantic.Field(
default=AllowDenyPattern.allow_all(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import importlib.resources as pkg_resource
import logging
import os
from typing import Dict, List

import lark
Expand All @@ -19,9 +20,12 @@
TRACE_POWERBI_MQUERY_PARSER,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout

logger = logging.getLogger(__name__)

_M_QUERY_PARSE_TIMEOUT = int(os.getenv("DATAHUB_POWERBI_M_QUERY_PARSE_TIMEOUT", 60))


@functools.lru_cache(maxsize=1)
def get_lark_parser() -> Lark:
Expand All @@ -41,7 +45,8 @@ def _parse_expression(expression: str) -> Tree:
expression = expression.replace("\u00a0", " ")

logger.debug(f"Parsing expression = {expression}")
parse_tree: Tree = lark_parser.parse(expression)
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
parse_tree: Tree = lark_parser.parse(expression)

if TRACE_POWERBI_MQUERY_PARSER:
logger.debug(parse_tree.pretty())
Expand Down Expand Up @@ -83,17 +88,26 @@ def get_upstream_tables(
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
return []
except KeyboardInterrupt:
raise
except TimeoutException:
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
except (
BaseException
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
title = "Unexpected Character Found"
error_type = "Unexpected Character Error"
else:
title = "Unknown Parsing Error"
error_type = "Unknown Parsing Error"

reporter.warning(
title=title,
message="Unknown parsing error",
title="Unable to extract lineage from M-Query expression",
message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,9 @@ def internal(
)
if v_statement is None:
self.reporter.report_warning(
f"{self.table.full_name}-variable-statement",
f"output variable ({current_identifier}) statement not found in table expression",
title="Unable to extract lineage from M-Query expression",
message="Lineage will be incomplete.",
context=f"table-full-name={self.table.full_name}: output-variable={current_identifier} not found in table expression",
)
return None

Expand Down
56 changes: 35 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from datetime import datetime
from typing import Iterable, List, Optional, Tuple, Union

import more_itertools

import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -795,6 +797,11 @@ def generate_container_for_workspace(
container_key=self.workspace_key,
name=workspace.name,
sub_types=[workspace.type],
extra_properties={
"workspace_id": workspace.id,
"workspace_name": workspace.name,
"workspace_type": workspace.type,
},
)
return container_work_units

Expand Down Expand Up @@ -1256,20 +1263,33 @@ def create(cls, config_dict, ctx):

def get_allowed_workspaces(self) -> List[powerbi_data_classes.Workspace]:
all_workspaces = self.powerbi_client.get_workspaces()
logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.all_workspace_count = len(all_workspaces)
logger.debug(
f"All workspaces: {[workspace.format_name_for_logger() for workspace in all_workspaces]}"
)

allowed_wrk = [
workspace
for workspace in all_workspaces
if self.source_config.workspace_id_pattern.allowed(workspace.id)
and workspace.type in self.source_config.workspace_type_filter
]
allowed_workspaces = []
for workspace in all_workspaces:
if not self.source_config.workspace_id_pattern.allowed(workspace.id):
self.reporter.filtered_workspace_names.append(
f"{workspace.id} - {workspace.name}"
)
continue
elif workspace.type not in self.source_config.workspace_type_filter:
self.reporter.filtered_workspace_types.append(
f"{workspace.id} - {workspace.name} (type = {workspace.type})"
)
continue
else:
allowed_workspaces.append(workspace)

logger.info(f"Number of workspaces = {len(all_workspaces)}")
self.reporter.report_number_of_workspaces(len(all_workspaces))
logger.info(f"Number of allowed workspaces = {len(allowed_wrk)}")
logger.debug(f"Workspaces = {all_workspaces}")
logger.info(f"Number of allowed workspaces = {len(allowed_workspaces)}")
logger.debug(
f"Allowed workspaces: {[workspace.format_name_for_logger() for workspace in allowed_workspaces]}"
)

return allowed_wrk
return allowed_workspaces

def validate_dataset_type_mapping(self):
powerbi_data_platforms: List[str] = [
Expand Down Expand Up @@ -1480,16 +1500,10 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# Fetch PowerBi workspace for given workspace identifier

allowed_workspaces = self.get_allowed_workspaces()
workspaces_len = len(allowed_workspaces)

batch_size = (
self.source_config.scan_batch_size
) # 100 is the maximum allowed for powerbi scan
num_batches = (workspaces_len + batch_size - 1) // batch_size
batches = [
allowed_workspaces[i * batch_size : (i + 1) * batch_size]
for i in range(num_batches)
]

batches = more_itertools.chunked(
allowed_workspaces, self.source_config.scan_batch_size
)
for batch_workspaces in batches:
for workspace in self.powerbi_client.fill_workspaces(
batch_workspaces, self.reporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def get_workspace_key(
instance=platform_instance,
)

def format_name_for_logger(self) -> str:
return f"{self.name} ({self.id})"


@dataclass
class DataSource:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ def _is_scan_result_ready(
res.raise_for_status()

if res.json()[Constant.STATUS].upper() == Constant.SUCCEEDED:
logger.info(f"Scan result is available for scan id({scan_id})")
logger.debug(f"Scan result is available for scan id({scan_id})")
return True

if retry == max_retry:
Expand Down Expand Up @@ -898,8 +898,8 @@ def get_users(self, workspace_id: str, entity: str, entity_id: str) -> List[User
return users

def get_scan_result(self, scan_id: str) -> Optional[dict]:
logger.info("Fetching scan result")
logger.info(f"{Constant.SCAN_ID}={scan_id}")
logger.debug("Fetching scan result")
logger.debug(f"{Constant.SCAN_ID}={scan_id}")
scan_result_get_endpoint = AdminAPIResolver.API_ENDPOINTS[
Constant.SCAN_RESULT_GET
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def _get_scan_result(self, workspace_ids: List[str]) -> Any:
)
return None

logger.info("Waiting for scan to complete")
logger.debug("Waiting for scan to complete")
if (
self.__admin_api_resolver.wait_for_scan_to_complete(
scan_id=scan_id, timeout=self.__config.scan_timeout
Expand Down Expand Up @@ -355,22 +355,32 @@ def _get_workspace_datasets(self, workspace: Workspace) -> dict:
logger.debug("Processing scan result for datasets")

for dataset_dict in datasets:
dataset_instance: PowerBIDataset = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_dict[Constant.ID],
)
dataset_id = dataset_dict[Constant.ID]
try:
dataset_instance = self._get_resolver().get_dataset(
workspace=workspace,
dataset_id=dataset_id,
)
if dataset_instance is None:
continue
except Exception as e:
self.reporter.warning(
title="Unable to fetch dataset details",
message="Skipping this dataset due to the error. Metadata will be incomplete.",
context=f"workspace={workspace.name}, dataset-id={dataset_id}",
exc=e,
)
continue

# fetch + set dataset parameters
try:
dataset_parameters = self._get_resolver().get_dataset_parameters(
workspace_id=workspace.id,
dataset_id=dataset_dict[Constant.ID],
dataset_id=dataset_id,
)
dataset_instance.parameters = dataset_parameters
except Exception as e:
logger.info(
f"Unable to fetch dataset parameters for {dataset_dict[Constant.ID]}: {e}"
)
logger.info(f"Unable to fetch dataset parameters for {dataset_id}: {e}")

if self.__config.extract_endorsements_to_tags:
dataset_instance.tags = self._parse_endorsement(
Expand Down Expand Up @@ -564,8 +574,7 @@ def _fill_metadata_from_scan_result(
)
else:
logger.info(
"Skipping endorsements tag as extract_endorsements_to_tags is set to "
"false "
"Skipping endorsements tag as extract_endorsements_to_tags is not enabled"
)

self._populate_app_details(
Expand Down Expand Up @@ -641,6 +650,9 @@ def fill_dashboard_tags() -> None:
def fill_workspaces(
self, workspaces: List[Workspace], reporter: PowerBiDashboardSourceReport
) -> Iterable[Workspace]:
logger.info(
f"Fetching initial metadata for workspaces: {[workspace.format_name_for_logger() for workspace in workspaces]}"
)

workspaces = self._fill_metadata_from_scan_result(workspaces=workspaces)
# First try to fill the admin detail as some regular metadata contains lineage to admin metadata
Expand Down
Loading
Loading