diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 92f61870aa9684..f2f16164294ecd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -411,6 +411,15 @@ def update_table( self.num_cols = num_tbl_cols +class TableauSourceReport(StaleEntityRemovalSourceReport): + get_all_datasources_query_failed: bool = False + num_get_datasource_query_failures: int = 0 + num_datasource_field_skipped_no_name: int = 0 + num_csql_field_skipped_no_name: int = 0 + num_table_field_skipped_no_name: int = 0 + num_upstream_table_skipped_no_name: int = 0 + + @platform_name("Tableau") @config_class(TableauConfig) @support_status(SupportStatus.INCUBATING) @@ -442,7 +451,7 @@ def __init__( super().__init__(config, ctx) self.config: TableauConfig = config - self.report: StaleEntityRemovalSourceReport = StaleEntityRemovalSourceReport() + self.report: TableauSourceReport = TableauSourceReport() self.server: Optional[Server] = None self.database_tables: Dict[str, DatabaseTable] = {} self.tableau_stat_registry: Dict[str, UsageStat] = {} @@ -595,14 +604,21 @@ def _init_datasource_registry(self) -> None: if self.server is None: return - for ds in TSC.Pager(self.server.datasources): - if ds.project_id not in self.tableau_project_registry: - logger.debug( - f"project id ({ds.project_id}) of datasource {ds.name} is not present in project " - f"registry" - ) - continue - self.datasource_project_map[ds.id] = ds.project_id + try: + raise Exception() + for ds in TSC.Pager(self.server.datasources): + if ds.project_id not in self.tableau_project_registry: + logger.debug( + f"project id ({ds.project_id}) of datasource {ds.name} is not present in project " + f"registry" + ) + continue + self.datasource_project_map[ds.id] = ds.project_id + self.report.get_all_datasources_query_failed = False + except Exception as e: + self.report.get_all_datasources_query_failed = True + logger.info(f"Get all datasources query failed due to error {e}") + logger.debug("Error stack trace", exc_info=True) def _init_workbook_registry(self) -> None: if self.server is None: @@ -956,6 +972,7 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq ) continue elif table[tableau_constant.NAME] is None: + self.report.num_upstream_table_skipped_no_name += 1 logger.warning( f"Skipping upstream table {table[tableau_constant.ID]} from lineage since its name is none: {table}" ) @@ -1263,6 +1280,7 @@ def get_schema_metadata_for_custom_sql( # Datasource fields if field.get(tableau_constant.NAME) is None: + self.report.num_csql_field_skipped_no_name += 1 logger.warning( f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" ) @@ -1290,6 +1308,16 @@ def get_schema_metadata_for_custom_sql( return schema_metadata def _get_published_datasource_project_luid(self, ds): + # This is fallback in case "get all datasources" query fails for some reason. + # It is possible due to https://github.com/tableau/server-client-python/issues/1210 + if ( + ds.get(tableau_constant.LUID) + and ds[tableau_constant.LUID] not in self.datasource_project_map.keys() + and self.report.get_all_datasources_query_failed + ): + # Query and update self.datasource_project_map with luid + self._query_published_datasource_for_project_luid(ds) + if ( ds.get(tableau_constant.LUID) and ds[tableau_constant.LUID] in self.datasource_project_map.keys() @@ -1304,6 +1332,30 @@ def _get_published_datasource_project_luid(self, ds): return None + def _query_published_datasource_for_project_luid(self, ds: dict) -> None: + if self.server is None: + return + + try: + logger.debug( + f"published datasource {ds.get(tableau_constant.NAME)} project_luid not found." + f" Running get datasource query for {ds[tableau_constant.LUID]}" + ) + ds_result = self.server.datasources.get_by_id(ds[tableau_constant.LUID]) + if ds_result.project_id not in self.tableau_project_registry: + logger.debug( + f"project id ({ds_result.project_id}) of datasource {ds_result.name} is not present in project " + f"registry" + ) + else: + self.datasource_project_map[ds_result.id] = ds_result.project_id + except Exception as e: + self.report.num_get_datasource_query_failures += 1 + logger.warning( + f"Failed to get datasource project_luid for {ds[tableau_constant.LUID]} due to error {e}" + ) + logger.debug("Error stack trace", exc_info=True) + def _get_workbook_project_luid(self, wb): if wb.get(tableau_constant.LUID) and self.workbook_project_map.get( wb[tableau_constant.LUID] @@ -1451,6 +1503,7 @@ def _get_schema_metadata_for_datasource( # check datasource - custom sql relations from a field being referenced self._track_custom_sql_ids(field) if field.get(tableau_constant.NAME) is None: + self.report.num_upstream_table_skipped_no_name += 1 logger.warning( f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" ) @@ -1619,29 +1672,38 @@ def emit_datasource( ), ) - if ( - is_embedded_ds and workbook is not None - ): # It is embedded then parent is container is workbook - yield from add_entity_to_container( - self.gen_workbook_key(workbook), - tableau_constant.DATASET, - dataset_snapshot.urn, - ) - elif ( - datasource.get(tableau_constant.LUID) - and datasource[tableau_constant.LUID] in self.datasource_project_map.keys() - ): # It is published datasource and hence parent container is project + container_key = self._get_datasource_container_key( + datasource, workbook, is_embedded_ds + ) + if container_key is not None: yield from add_entity_to_container( - self.gen_project_key( - self.datasource_project_map[datasource[tableau_constant.LUID]] - ), + container_key, tableau_constant.DATASET, dataset_snapshot.urn, ) + + def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds): + container_key: Optional[PlatformKey] = None + if is_embedded_ds: # It is embedded then parent is container is workbook + if workbook is not None: + container_key = self.gen_workbook_key(workbook) + else: + logger.warning( + f"Parent container not set for embedded datasource {datasource[tableau_constant.ID]}" + ) else: - logger.warning( - f"Parent container not set for datasource {datasource[tableau_constant.ID]}" + parent_project_luid = self._get_published_datasource_project_luid( + datasource ) + # It is published datasource and hence parent container is project + if parent_project_luid is not None: + container_key = self.gen_project_key(parent_project_luid) + else: + logger.warning( + f"Parent container not set for published datasource {datasource[tableau_constant.ID]}" + ) + + return container_key def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.datasource_ids_being_used)}" @@ -1695,11 +1757,22 @@ def emit_table( dataset_snapshot.aspects.append(browse_paths) else: logger.debug(f"Browse path not set for table {database_table.urn}") - schema_metadata = None + + schema_metadata = self.get_schema_metadata_for_table(columns or []) + if schema_metadata is not None: + dataset_snapshot.aspects.append(schema_metadata) + + yield self.get_metadata_change_event(dataset_snapshot) + + def get_schema_metadata_for_table( + self, columns: List[dict] + ) -> Optional[SchemaMetadata]: + schema_metadata: Optional[SchemaMetadata] = None if columns: fields = [] for field in columns: if field.get(tableau_constant.NAME) is None: + self.report.num_table_field_skipped_no_name += 1 logger.warning( f"Skipping field {field[tableau_constant.ID]} from schema since its name is none" ) @@ -1726,10 +1799,8 @@ def emit_table( hash="", platformSchema=OtherSchema(rawSchema=""), ) - if schema_metadata is not None: - dataset_snapshot.aspects.append(schema_metadata) - yield self.get_metadata_change_event(dataset_snapshot) + return schema_metadata def get_sheetwise_upstream_datasources(self, sheet: dict) -> set: sheet_upstream_datasources = set() @@ -2368,5 +2439,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}", ) - def get_report(self) -> StaleEntityRemovalSourceReport: + def get_report(self) -> TableauSourceReport: return self.report diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 09411670edb0c4..8615545ffbd573 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -188,6 +188,13 @@ def side_effect_workbook_data(*arg, **kwargs): ], mock_pagination +def side_effect_datasource_get_by_id(id, *arg, **kwargs): + datasources, _ = side_effect_datasource_data() + for ds in datasources: + if ds._id == id: + return ds + + def tableau_ingest_common( pytestconfig, tmp_path, @@ -198,6 +205,7 @@ def tableau_ingest_common( pipeline_config=config_source_default, sign_out_side_effect=lambda: None, pipeline_name="tableau-test-pipeline", + datasources_side_effect=side_effect_datasource_data, ): with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", @@ -215,7 +223,10 @@ def tableau_ingest_common( mock_client.projects = mock.Mock() mock_client.projects.get.side_effect = side_effect_project_data mock_client.datasources = mock.Mock() - mock_client.datasources.get.side_effect = side_effect_datasource_data + mock_client.datasources.get.side_effect = datasources_side_effect + mock_client.datasources.get_by_id.side_effect = ( + side_effect_datasource_get_by_id + ) mock_client.workbooks = mock.Mock() mock_client.workbooks.get.side_effect = side_effect_workbook_data mock_client.views.get.side_effect = side_effect_usage_stat @@ -243,12 +254,13 @@ def tableau_ingest_common( pipeline.run() pipeline.raise_from_status() - mce_helpers.check_golden_file( - pytestconfig, - output_path=f"{tmp_path}/{output_file_name}", - golden_path=test_resources_dir / golden_file_name, - ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS, - ) + if golden_file_name: + mce_helpers.check_golden_file( + pytestconfig, + output_path=f"{tmp_path}/{output_file_name}", + golden_path=test_resources_dir / golden_file_name, + ignore_paths=mce_helpers.IGNORE_PATH_TIMESTAMPS, + ) return pipeline @@ -767,3 +779,28 @@ def test_tableau_unsupported_csql(mock_datahub_graph): mcp.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)" ) + + +@freeze_time(FROZEN_TIME) +@pytest.mark.integration +def test_get_all_datasources_failure(pytestconfig, tmp_path, mock_datahub_graph): + output_file_name: str = "tableau_mces.json" + golden_file_name: str = "tableau_mces_golden.json" + tableau_ingest_common( + pytestconfig, + tmp_path, + [ + read_response(pytestconfig, "workbooksConnection_all.json"), + read_response(pytestconfig, "sheetsConnection_all.json"), + read_response(pytestconfig, "dashboardsConnection_all.json"), + read_response(pytestconfig, "embeddedDatasourcesConnection_all.json"), + read_response(pytestconfig, "publishedDatasourcesConnection_all.json"), + read_response(pytestconfig, "customSQLTablesConnection_all.json"), + read_response(pytestconfig, "databaseTablesConnection_all.json"), + ], + golden_file_name, + output_file_name, + mock_datahub_graph, + pipeline_name="test_tableau_ingest", + datasources_side_effect=ValueError("project_id must be defined."), + )