diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 600292c2c9942..e30034857de37 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -46,13 +46,10 @@ class SnowflakeTag: value: str def display_name(self) -> str: - return f"{self.name}: {self.value}" + return f"{self.name}:{self.value}" if self.value else self.name def identifier(self) -> str: - return f"{self._id_prefix_as_str()}:{self.value}" - - def _id_prefix_as_str(self) -> str: - return f"{self.database}.{self.schema}.{self.name}" + return self.display_name() @dataclass diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index d4442749a0622..112edbf52b052 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -2,6 +2,10 @@ import logging from typing import Dict, Iterable, List, Optional, Union +from datahub.api.entities.external.external_entities import ( + LinkedResourceSet, + PlatformResourceRepository, +) from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -20,6 +24,7 @@ DatasetContainerSubTypes, DatasetSubTypes, ) +from datahub.ingestion.source.snowflake import tag_entities from datahub.ingestion.source.snowflake.constants import ( GENERIC_PERMISSION_ERROR_KEY, SNOWFLAKE_DATABASE, @@ -55,6 +60,11 @@ SnowflakeStructuredReportMixin, SnowsightUrlBuilder, ) +from datahub.ingestion.source.snowflake.tag_entities import ( + SnowflakeSystem, + SnowflakeTagId, + SnowflakeTagSyncContext, +) from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, @@ -96,6 +106,7 @@ TimeType, ) from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties +from datahub.metadata.urns import TagUrn from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator from datahub.utilities.registries.domain_registry import DomainRegistry from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor @@ -155,6 +166,7 @@ def __init__( profiler: Optional[SnowflakeProfiler], aggregator: Optional[SqlParsingAggregator], snowsight_url_builder: Optional[SnowsightUrlBuilder], + platform_resource_repository: Optional[PlatformResourceRepository], ) -> None: self.config: SnowflakeV2Config = config self.report: SnowflakeV2Report = report @@ -180,6 +192,7 @@ def __init__( # These are populated as side-effects of get_workunits_internal. self.databases: List[SnowflakeDatabase] = [] self.aggregator: Optional[SqlParsingAggregator] = aggregator + self.platform_resource_repository = platform_resource_repository def get_connection(self) -> SnowflakeConnection: return self.connection @@ -629,7 +642,7 @@ def _process_view( yield from self.gen_dataset_workunits(view, schema_name, db_name) def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: - tag_identifier = tag.identifier() + tag_identifier = tag.display_name() if self.report.is_tag_processed(tag_identifier): return @@ -716,7 +729,7 @@ def gen_dataset_workunits( if table.tags: tag_associations = [ TagAssociation( - tag=make_tag_urn(self.snowflake_identifier(tag.identifier())) + tag=make_tag_urn(self.snowflake_identifier(tag.display_name())) ) for tag in table.tags ] @@ -783,17 +796,75 @@ def get_dataset_properties( ) def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: - tag_urn = make_tag_urn(self.snowflake_identifier(tag.identifier())) + assert ( + self.platform_resource_repository + ), "Platform resource repository is required" + + tag_urn = make_tag_urn(tag.name) tag_properties_aspect = TagProperties( name=tag.display_name(), - description=f"Represents the Snowflake tag `{tag._id_prefix_as_str()}` with value `{tag.value}`.", + description=f"Represents the Snowflake tag `{tag.display_name()}` with value `{tag.value}`.", + ) + + snowflake_system = SnowflakeSystem(snowflake_config=self.config) + + tag_sync_context = SnowflakeTagSyncContext( + account_id=self.config.account_id, + tag_database=tag.database, + tag_schema=tag.schema, + ) + snowflake_tag_id = SnowflakeTagId.from_datahub_tag( + tag_urn=TagUrn.from_string(tag_urn), tag_sync_context=tag_sync_context + ) + + snowflake_tag = snowflake_system.get( + snowflake_tag_id, self.platform_resource_repository ) + if not snowflake_tag: + linked_resource_set = LinkedResourceSet(urns=[tag_urn]) + linked_resource_set.add(tag_urn) + tag_entities.SnowflakeTag( + datahub_urns=linked_resource_set, + managed_by_datahub=False, + id=snowflake_tag_id, + allowed_values=None, + ) + logger.info( + f"Snowflake tag {tag.display_name()} not found in platform_resource_repository. Creating new one..." + ) + else: + try: + ret = snowflake_tag.datahub_linked_resources().add(tag_urn) + except ValueError as e: + logger.error(f"Failed to add tag {tag.display_name()}. Error: {e}") + return + if ret: + logger.info( + f"Snowflake tag {tag.display_name()} added in platform_resource_repository" + ) + else: + logger.info( + f"Snowflake tag {tag.display_name()} already exists in platform_resource_repository" + ) + return + if not snowflake_tag: + return + + platform_resource = snowflake_tag.as_platform_resource() + yield MetadataChangeProposalWrapper( entityUrn=tag_urn, aspect=tag_properties_aspect ).as_workunit() + for mcp in platform_resource.to_mcps(): + yield mcp.as_workunit() + + self.platform_resource_repository.cache[ + platform_resource.id + ] = platform_resource + def gen_schema_metadata( self, table: Union[SnowflakeTable, SnowflakeView], @@ -835,7 +906,7 @@ def gen_schema_metadata( [ TagAssociation( make_tag_urn( - self.snowflake_identifier(tag.identifier()) + self.snowflake_identifier(tag.display_name()) ) ) for tag in table.column_tags[col.name] @@ -928,7 +999,7 @@ def gen_database_containers( ) ), tags=( - [self.snowflake_identifier(tag.identifier()) for tag in database.tags] + [self.snowflake_identifier(tag.display_name()) for tag in database.tags] if database.tags else None ), @@ -981,7 +1052,7 @@ def gen_schema_containers( else None ), tags=( - [self.snowflake_identifier(tag.identifier()) for tag in schema.tags] + [self.snowflake_identifier(tag.display_name()) for tag in schema.tags] if schema.tags else None ), diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py index be449e963d270..744bfc00d82fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py @@ -165,7 +165,7 @@ def _filter_tags( allowed_tags = [] for tag in tags: - tag_identifier = tag.identifier() + tag_identifier = tag.display_name() self.report.report_entity_scanned(tag_identifier, "tag") if not self.config.tag_pattern.allowed(tag_identifier): self.report.report_dropped(tag_identifier) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index dd7f73268fdc4..499542d2fdf07 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -8,6 +8,7 @@ from dataclasses import dataclass from typing import Dict, Iterable, List, Optional, Union +from datahub.api.entities.external.external_entities import PlatformResourceRepository from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -35,7 +36,10 @@ from datahub.ingestion.source.snowflake.snowflake_assertion import ( SnowflakeAssertionsHandler, ) -from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config +from datahub.ingestion.source.snowflake.snowflake_config import ( + SnowflakeV2Config, + TagOption, +) from datahub.ingestion.source.snowflake.snowflake_connection import ( SnowflakeConnection, SnowflakeConnectionConfig, @@ -161,6 +165,12 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): self.data_dictionary = SnowflakeDataDictionary(connection=self.connection) self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None self.aggregator: Optional[SqlParsingAggregator] = None + self.platform_resource_repository = None + + if self.ctx.graph and self.config.extract_tags != TagOption.skip: + self.platform_resource_repository = PlatformResourceRepository( + self.ctx.graph + ) if self.config.use_queries_v2 or self.config.include_table_lineage: self.aggregator = self._exit_stack.enter_context( @@ -476,6 +486,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: snowsight_url_builder=snowsight_url_builder, filters=self.filters, identifiers=self.identifiers, + platform_resource_repository=self.platform_resource_repository, ) self.report.set_ingestion_stage("*", METADATA_EXTRACTION) diff --git a/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json b/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json index ad270857dd7fc..114de1d0e19c4 100755 --- a/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json +++ b/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json @@ -20,7 +20,7 @@ "com.linkedin.pegasus2avro.common.InstitutionalMemory": { "elements": [ { - "url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/examples/", + "url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/tests/", "description": "Link to call for the dataset.", "createStamp": { "time": 1586847600, @@ -71,7 +71,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00" + "runId": "openapi-2020_04_14-07_00_00-68bgeu", + "lastRunId": "no-run-id-provided" } }, { @@ -95,7 +96,7 @@ "com.linkedin.pegasus2avro.common.InstitutionalMemory": { "elements": [ { - "url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/examples/v2", + "url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/tests/v2", "description": "Link to call for the dataset.", "createStamp": { "time": 1586847600, @@ -146,7 +147,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00" + "runId": "openapi-2020_04_14-07_00_00-68bgeu", + "lastRunId": "no-run-id-provided" } }, { @@ -161,7 +163,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00" + "runId": "openapi-2020_04_14-07_00_00-68bgeu", + "lastRunId": "no-run-id-provided" } }, { @@ -176,7 +179,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00" + "runId": "openapi-2020_04_14-07_00_00-68bgeu", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file