Skip to content

Commit

Permalink
fix(ingest/tableau): graceful handling of get all datasources failure…
Browse files Browse the repository at this point in the history
…, add fallback
  • Loading branch information
mayurinehate committed Jul 12, 2023
1 parent b97e9b5 commit 652fed1
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 38 deletions.
133 changes: 102 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,15 @@ def update_table(
self.num_cols = num_tbl_cols


class TableauSourceReport(StaleEntityRemovalSourceReport):
get_all_datasources_query_failed: bool = False
num_get_datasource_query_failures: int = 0
num_datasource_field_skipped_no_name: int = 0
num_csql_field_skipped_no_name: int = 0
num_table_field_skipped_no_name: int = 0
num_upstream_table_skipped_no_name: int = 0


@platform_name("Tableau")
@config_class(TableauConfig)
@support_status(SupportStatus.INCUBATING)
Expand Down Expand Up @@ -442,7 +451,7 @@ def __init__(
super().__init__(config, ctx)

self.config: TableauConfig = config
self.report: StaleEntityRemovalSourceReport = StaleEntityRemovalSourceReport()
self.report: TableauSourceReport = TableauSourceReport()
self.server: Optional[Server] = None
self.database_tables: Dict[str, DatabaseTable] = {}
self.tableau_stat_registry: Dict[str, UsageStat] = {}
Expand Down Expand Up @@ -595,14 +604,21 @@ def _init_datasource_registry(self) -> None:
if self.server is None:
return

for ds in TSC.Pager(self.server.datasources):
if ds.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
f"registry"
)
continue
self.datasource_project_map[ds.id] = ds.project_id
try:
raise Exception()
for ds in TSC.Pager(self.server.datasources):
if ds.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
f"registry"
)
continue
self.datasource_project_map[ds.id] = ds.project_id
self.report.get_all_datasources_query_failed = False
except Exception as e:
self.report.get_all_datasources_query_failed = True
logger.info(f"Get all datasources query failed due to error {e}")
logger.debug("Error stack trace", exc_info=True)

def _init_workbook_registry(self) -> None:
if self.server is None:
Expand Down Expand Up @@ -956,6 +972,7 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq
)
continue
elif table[tableau_constant.NAME] is None:
self.report.num_upstream_table_skipped_no_name += 1
logger.warning(
f"Skipping upstream table {table[tableau_constant.ID]} from lineage since its name is none: {table}"
)
Expand Down Expand Up @@ -1263,6 +1280,7 @@ def get_schema_metadata_for_custom_sql(
# Datasource fields

if field.get(tableau_constant.NAME) is None:
self.report.num_csql_field_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand Down Expand Up @@ -1290,6 +1308,16 @@ def get_schema_metadata_for_custom_sql(
return schema_metadata

def _get_published_datasource_project_luid(self, ds):
# This is fallback in case "get all datasources" query fails for some reason.
# It is possible due to https://github.com/tableau/server-client-python/issues/1210
if (
ds.get(tableau_constant.LUID)
and ds[tableau_constant.LUID] not in self.datasource_project_map.keys()
and self.report.get_all_datasources_query_failed
):
# Query and update self.datasource_project_map with luid
self._query_published_datasource_for_project_luid(ds)

if (
ds.get(tableau_constant.LUID)
and ds[tableau_constant.LUID] in self.datasource_project_map.keys()
Expand All @@ -1304,6 +1332,30 @@ def _get_published_datasource_project_luid(self, ds):

return None

def _query_published_datasource_for_project_luid(self, ds: dict) -> None:
if self.server is None:
return

try:
logger.debug(
f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found."
f" Running get datasource query for {ds[tableau_constant.LUID]}"
)
ds_result = self.server.datasources.get_by_id(ds[tableau_constant.LUID])
if ds_result.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project "
f"registry"
)
else:
self.datasource_project_map[ds_result.id] = ds_result.project_id
except Exception as e:
self.report.num_get_datasource_query_failures += 1
logger.warning(
f"Failed to get datasource project_luid for {ds[tableau_constant.LUID]} due to error {e}"
)
logger.debug("Error stack trace", exc_info=True)

def _get_workbook_project_luid(self, wb):
if wb.get(tableau_constant.LUID) and self.workbook_project_map.get(
wb[tableau_constant.LUID]
Expand Down Expand Up @@ -1451,6 +1503,7 @@ def _get_schema_metadata_for_datasource(
# check datasource - custom sql relations from a field being referenced
self._track_custom_sql_ids(field)
if field.get(tableau_constant.NAME) is None:
self.report.num_upstream_table_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand Down Expand Up @@ -1619,29 +1672,38 @@ def emit_datasource(
),
)

if (
is_embedded_ds and workbook is not None
): # It is embedded then parent is container is workbook
yield from add_entity_to_container(
self.gen_workbook_key(workbook),
tableau_constant.DATASET,
dataset_snapshot.urn,
)
elif (
datasource.get(tableau_constant.LUID)
and datasource[tableau_constant.LUID] in self.datasource_project_map.keys()
): # It is published datasource and hence parent container is project
container_key = self._get_datasource_container_key(
datasource, workbook, is_embedded_ds
)
if container_key is not None:
yield from add_entity_to_container(
self.gen_project_key(
self.datasource_project_map[datasource[tableau_constant.LUID]]
),
container_key,
tableau_constant.DATASET,
dataset_snapshot.urn,
)

def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds):
container_key: Optional[PlatformKey] = None
if is_embedded_ds: # It is embedded then parent is container is workbook
if workbook is not None:
container_key = self.gen_workbook_key(workbook)
else:
logger.warning(
f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}"
)
else:
logger.warning(
f"Parent container not set for datasource {datasource[tableau_constant.ID]}"
parent_project_luid = self._get_published_datasource_project_luid(
datasource
)
# It is published datasource and hence parent container is project
if parent_project_luid is not None:
container_key = self.gen_project_key(parent_project_luid)
else:
logger.warning(
f"Parent container not set for published datasource {datasource[tableau_constant.ID]}"
)

return container_key

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}"
Expand Down Expand Up @@ -1695,11 +1757,22 @@ def emit_table(
dataset_snapshot.aspects.append(browse_paths)
else:
logger.debug(f"Browse path not set for table {database_table.urn}")
schema_metadata = None

schema_metadata = self.get_schema_metadata_for_table(columns or [])
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

yield self.get_metadata_change_event(dataset_snapshot)

def get_schema_metadata_for_table(
self, columns: List[dict]
) -> Optional[SchemaMetadata]:
schema_metadata: Optional[SchemaMetadata] = None
if columns:
fields = []
for field in columns:
if field.get(tableau_constant.NAME) is None:
self.report.num_table_field_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand All @@ -1726,10 +1799,8 @@ def emit_table(
hash="",
platformSchema=OtherSchema(rawSchema=""),
)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

yield self.get_metadata_change_event(dataset_snapshot)
return schema_metadata

def get_sheetwise_upstream_datasources(self, sheet: dict) -> set:
sheet_upstream_datasources = set()
Expand Down Expand Up @@ -2368,5 +2439,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)

def get_report(self) -> StaleEntityRemovalSourceReport:
def get_report(self) -> TableauSourceReport:
return self.report
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ def side_effect_workbook_data(*arg, **kwargs):
], mock_pagination


def side_effect_datasource_get_by_id(id, *arg, **kwargs):
datasources, _ = side_effect_datasource_data()
for ds in datasources:
if ds._id == id:
return ds


def tableau_ingest_common(
pytestconfig,
tmp_path,
Expand All @@ -198,6 +205,7 @@ def tableau_ingest_common(
pipeline_config=config_source_default,
sign_out_side_effect=lambda: None,
pipeline_name="tableau-test-pipeline",
datasources_side_effect=side_effect_datasource_data,
):
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
Expand All @@ -215,7 +223,10 @@ def tableau_ingest_common(
mock_client.projects = mock.Mock()
mock_client.projects.get.side_effect = side_effect_project_data
mock_client.datasources = mock.Mock()
mock_client.datasources.get.side_effect = side_effect_datasource_data
mock_client.datasources.get.side_effect = datasources_side_effect
mock_client.datasources.get_by_id.side_effect = (
side_effect_datasource_get_by_id
)
mock_client.workbooks = mock.Mock()
mock_client.workbooks.get.side_effect = side_effect_workbook_data
mock_client.views.get.side_effect = side_effect_usage_stat
Expand Down Expand Up @@ -243,12 +254,13 @@ def tableau_ingest_common(
pipeline.run()
pipeline.raise_from_status()

mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / golden_file_name,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
if golden_file_name:
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / golden_file_name,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
return pipeline


Expand Down Expand Up @@ -767,3 +779,28 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
mcp.entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_get_all_datasources_failure(pytestconfig, tmp_path, mock_datahub_graph):
output_file_name: str = "tableau_mces.json"
golden_file_name: str = "tableau_mces_golden.json"
tableau_ingest_common(
pytestconfig,
tmp_path,
[
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
],
golden_file_name,
output_file_name,
mock_datahub_graph,
pipeline_name="test_tableau_ingest",
datasources_side_effect=ValueError("project_id must be defined."),
)

0 comments on commit 652fed1

Please sign in to comment.