Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/tableau): graceful handling of get all datasources failure… #8406

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 101 additions & 31 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,15 @@ def update_table(
self.num_cols = num_tbl_cols


class TableauSourceReport(StaleEntityRemovalSourceReport):
get_all_datasources_query_failed: bool = False
num_get_datasource_query_failures: int = 0
num_datasource_field_skipped_no_name: int = 0
num_csql_field_skipped_no_name: int = 0
num_table_field_skipped_no_name: int = 0
num_upstream_table_skipped_no_name: int = 0


@platform_name("Tableau")
@config_class(TableauConfig)
@support_status(SupportStatus.CERTIFIED)
Expand Down Expand Up @@ -442,7 +451,7 @@ def __init__(
super().__init__(config, ctx)

self.config: TableauConfig = config
self.report: StaleEntityRemovalSourceReport = StaleEntityRemovalSourceReport()
self.report: TableauSourceReport = TableauSourceReport()
self.server: Optional[Server] = None
self.database_tables: Dict[str, DatabaseTable] = {}
self.tableau_stat_registry: Dict[str, UsageStat] = {}
Expand Down Expand Up @@ -595,14 +604,20 @@ def _init_datasource_registry(self) -> None:
if self.server is None:
return

for ds in TSC.Pager(self.server.datasources):
if ds.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
f"registry"
)
continue
self.datasource_project_map[ds.id] = ds.project_id
try:
for ds in TSC.Pager(self.server.datasources):
if ds.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds.project_id}) of datasource {ds.name} is not present in project "
f"registry"
)
continue
self.datasource_project_map[ds.id] = ds.project_id
self.report.get_all_datasources_query_failed = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a no-op. Either way we should set the default to None or just remove this statement

except Exception as e:
self.report.get_all_datasources_query_failed = True
logger.info(f"Get all datasources query failed due to error {e}")
logger.debug("Error stack trace", exc_info=True)

def _init_workbook_registry(self) -> None:
if self.server is None:
Expand Down Expand Up @@ -956,6 +971,7 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq
)
continue
elif table[tableau_constant.NAME] is None:
self.report.num_upstream_table_skipped_no_name += 1
logger.warning(
f"Skipping upstream table {table[tableau_constant.ID]} from lineage since its name is none: {table}"
)
Expand Down Expand Up @@ -1263,6 +1279,7 @@ def get_schema_metadata_for_custom_sql(
# Datasource fields

if field.get(tableau_constant.NAME) is None:
self.report.num_csql_field_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand Down Expand Up @@ -1290,6 +1307,16 @@ def get_schema_metadata_for_custom_sql(
return schema_metadata

def _get_published_datasource_project_luid(self, ds):
# This is fallback in case "get all datasources" query fails for some reason.
# It is possible due to https://github.com/tableau/server-client-python/issues/1210
if (
ds.get(tableau_constant.LUID)
and ds[tableau_constant.LUID] not in self.datasource_project_map.keys()
and self.report.get_all_datasources_query_failed
):
# Query and update self.datasource_project_map with luid
self._query_published_datasource_for_project_luid(ds)

if (
ds.get(tableau_constant.LUID)
and ds[tableau_constant.LUID] in self.datasource_project_map.keys()
Expand All @@ -1304,6 +1331,30 @@ def _get_published_datasource_project_luid(self, ds):

return None

def _query_published_datasource_for_project_luid(self, ds: dict) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass in ds[LUID] rather than ds here?

Copy link
Collaborator Author

@mayurinehate mayurinehate Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in followup PR

if self.server is None:
return

try:
logger.debug(
f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found."
f" Running get datasource query for {ds[tableau_constant.LUID]}"
)
ds_result = self.server.datasources.get_by_id(ds[tableau_constant.LUID])
if ds_result.project_id not in self.tableau_project_registry:
logger.debug(
f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project "
f"registry"
)
else:
self.datasource_project_map[ds_result.id] = ds_result.project_id
except Exception as e:
self.report.num_get_datasource_query_failures += 1
logger.warning(
f"Failed to get datasource project_luid for {ds[tableau_constant.LUID]} due to error {e}"
)
logger.debug("Error stack trace", exc_info=True)

def _get_workbook_project_luid(self, wb):
if wb.get(tableau_constant.LUID) and self.workbook_project_map.get(
wb[tableau_constant.LUID]
Expand Down Expand Up @@ -1451,6 +1502,7 @@ def _get_schema_metadata_for_datasource(
# check datasource - custom sql relations from a field being referenced
self._track_custom_sql_ids(field)
if field.get(tableau_constant.NAME) is None:
self.report.num_upstream_table_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand Down Expand Up @@ -1619,29 +1671,38 @@ def emit_datasource(
),
)

if (
is_embedded_ds and workbook is not None
): # It is embedded then parent is container is workbook
yield from add_entity_to_container(
self.gen_workbook_key(workbook),
tableau_constant.DATASET,
dataset_snapshot.urn,
)
elif (
datasource.get(tableau_constant.LUID)
and datasource[tableau_constant.LUID] in self.datasource_project_map.keys()
): # It is published datasource and hence parent container is project
container_key = self._get_datasource_container_key(
datasource, workbook, is_embedded_ds
)
Comment on lines +1674 to +1676
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

if container_key is not None:
yield from add_entity_to_container(
self.gen_project_key(
self.datasource_project_map[datasource[tableau_constant.LUID]]
),
container_key,
tableau_constant.DATASET,
dataset_snapshot.urn,
)

def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds):
container_key: Optional[PlatformKey] = None
if is_embedded_ds: # It is embedded then parent is container is workbook
if workbook is not None:
container_key = self.gen_workbook_key(workbook)
else:
logger.warning(
f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}"
)
else:
logger.warning(
f"Parent container not set for datasource {datasource[tableau_constant.ID]}"
parent_project_luid = self._get_published_datasource_project_luid(
datasource
)
# It is published datasource and hence parent container is project
if parent_project_luid is not None:
container_key = self.gen_project_key(parent_project_luid)
else:
logger.warning(
f"Parent container not set for published datasource {datasource[tableau_constant.ID]}"
)

return container_key

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}"
Expand Down Expand Up @@ -1695,11 +1756,22 @@ def emit_table(
dataset_snapshot.aspects.append(browse_paths)
else:
logger.debug(f"Browse path not set for table {database_table.urn}")
schema_metadata = None

schema_metadata = self.get_schema_metadata_for_table(columns or [])
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

yield self.get_metadata_change_event(dataset_snapshot)

def get_schema_metadata_for_table(
self, columns: List[dict]
) -> Optional[SchemaMetadata]:
schema_metadata: Optional[SchemaMetadata] = None
if columns:
fields = []
for field in columns:
if field.get(tableau_constant.NAME) is None:
self.report.num_table_field_skipped_no_name += 1
logger.warning(
f"Skipping field {field[tableau_constant.ID]} from schema since its name is none"
)
Expand All @@ -1726,10 +1798,8 @@ def emit_table(
hash="",
platformSchema=OtherSchema(rawSchema=""),
)
if schema_metadata is not None:
dataset_snapshot.aspects.append(schema_metadata)

yield self.get_metadata_change_event(dataset_snapshot)
return schema_metadata

def get_sheetwise_upstream_datasources(self, sheet: dict) -> set:
sheet_upstream_datasources = set()
Expand Down Expand Up @@ -2368,5 +2438,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)

def get_report(self) -> StaleEntityRemovalSourceReport:
def get_report(self) -> TableauSourceReport:
return self.report
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ def side_effect_workbook_data(*arg, **kwargs):
], mock_pagination


def side_effect_datasource_get_by_id(id, *arg, **kwargs):
datasources, _ = side_effect_datasource_data()
for ds in datasources:
if ds._id == id:
return ds


def tableau_ingest_common(
pytestconfig,
tmp_path,
Expand All @@ -198,6 +205,7 @@ def tableau_ingest_common(
pipeline_config=config_source_default,
sign_out_side_effect=lambda: None,
pipeline_name="tableau-test-pipeline",
datasources_side_effect=side_effect_datasource_data,
):
with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
Expand All @@ -215,7 +223,10 @@ def tableau_ingest_common(
mock_client.projects = mock.Mock()
mock_client.projects.get.side_effect = side_effect_project_data
mock_client.datasources = mock.Mock()
mock_client.datasources.get.side_effect = side_effect_datasource_data
mock_client.datasources.get.side_effect = datasources_side_effect
mock_client.datasources.get_by_id.side_effect = (
side_effect_datasource_get_by_id
)
mock_client.workbooks = mock.Mock()
mock_client.workbooks.get.side_effect = side_effect_workbook_data
mock_client.views.get.side_effect = side_effect_usage_stat
Expand Down Expand Up @@ -243,12 +254,13 @@ def tableau_ingest_common(
pipeline.run()
pipeline.raise_from_status()

mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / golden_file_name,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
if golden_file_name:
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{output_file_name}",
golden_path=test_resources_dir / golden_file_name,
ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS,
)
return pipeline


Expand Down Expand Up @@ -767,3 +779,28 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
mcp.entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_get_all_datasources_failure(pytestconfig, tmp_path, mock_datahub_graph):
output_file_name: str = "tableau_mces.json"
golden_file_name: str = "tableau_mces_golden.json"
tableau_ingest_common(
pytestconfig,
tmp_path,
[
read_response(pytestconfig, "workbooksConnection_all.json"),
read_response(pytestconfig, "sheetsConnection_all.json"),
read_response(pytestconfig, "dashboardsConnection_all.json"),
read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"),
read_response(pytestconfig, "publishedDatasourcesConnection_all.json"),
read_response(pytestconfig, "customSQLTablesConnection_all.json"),
read_response(pytestconfig, "databaseTablesConnection_all.json"),
],
golden_file_name,
output_file_name,
mock_datahub_graph,
pipeline_name="test_tableau_ingest",
datasources_side_effect=ValueError("project_id must be defined."),
)