Skip to content

Commit

Permalink
Add missing changes
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Nov 4, 2024
1 parent 8671d5b commit 1397858
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ class SnowflakeTag:
value: str

def display_name(self) -> str:
return f"{self.name}: {self.value}"
return f"{self.name}:{self.value}" if self.value else self.name

def identifier(self) -> str:
return f"{self._id_prefix_as_str()}:{self.value}"

def _id_prefix_as_str(self) -> str:
return f"{self.database}.{self.schema}.{self.name}"
return self.display_name()


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import logging
from typing import Dict, Iterable, List, Optional, Union

from datahub.api.entities.external.external_entities import (
LinkedResourceSet,
PlatformResourceRepository,
)
from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand All @@ -20,6 +24,7 @@
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.snowflake import tag_entities
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
SNOWFLAKE_DATABASE,
Expand Down Expand Up @@ -55,6 +60,11 @@
SnowflakeStructuredReportMixin,
SnowsightUrlBuilder,
)
from datahub.ingestion.source.snowflake.tag_entities import (
SnowflakeSystem,
SnowflakeTagId,
SnowflakeTagSyncContext,
)
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
Expand Down Expand Up @@ -96,6 +106,7 @@
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.metadata.urns import TagUrn
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
Expand Down Expand Up @@ -155,6 +166,7 @@ def __init__(
profiler: Optional[SnowflakeProfiler],
aggregator: Optional[SqlParsingAggregator],
snowsight_url_builder: Optional[SnowsightUrlBuilder],
platform_resource_repository: Optional[PlatformResourceRepository],
) -> None:
self.config: SnowflakeV2Config = config
self.report: SnowflakeV2Report = report
Expand All @@ -180,6 +192,7 @@ def __init__(
# These are populated as side-effects of get_workunits_internal.
self.databases: List[SnowflakeDatabase] = []
self.aggregator: Optional[SqlParsingAggregator] = aggregator
self.platform_resource_repository = platform_resource_repository

def get_connection(self) -> SnowflakeConnection:
return self.connection
Expand Down Expand Up @@ -629,7 +642,7 @@ def _process_view(
yield from self.gen_dataset_workunits(view, schema_name, db_name)

def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
tag_identifier = tag.identifier()
tag_identifier = tag.display_name()

if self.report.is_tag_processed(tag_identifier):
return
Expand Down Expand Up @@ -716,7 +729,7 @@ def gen_dataset_workunits(
if table.tags:
tag_associations = [
TagAssociation(
tag=make_tag_urn(self.snowflake_identifier(tag.identifier()))
tag=make_tag_urn(self.snowflake_identifier(tag.display_name()))
)
for tag in table.tags
]
Expand Down Expand Up @@ -783,17 +796,75 @@ def get_dataset_properties(
)

def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
tag_urn = make_tag_urn(self.snowflake_identifier(tag.identifier()))

assert (
self.platform_resource_repository
), "Platform resource repository is required"

tag_urn = make_tag_urn(tag.name)
tag_properties_aspect = TagProperties(
name=tag.display_name(),
description=f"Represents the Snowflake tag `{tag._id_prefix_as_str()}` with value `{tag.value}`.",
description=f"Represents the Snowflake tag `{tag.display_name()}` with value `{tag.value}`.",
)

snowflake_system = SnowflakeSystem(snowflake_config=self.config)

tag_sync_context = SnowflakeTagSyncContext(
account_id=self.config.account_id,
tag_database=tag.database,
tag_schema=tag.schema,
)
snowflake_tag_id = SnowflakeTagId.from_datahub_tag(
tag_urn=TagUrn.from_string(tag_urn), tag_sync_context=tag_sync_context
)

snowflake_tag = snowflake_system.get(
snowflake_tag_id, self.platform_resource_repository
)

if not snowflake_tag:
linked_resource_set = LinkedResourceSet(urns=[tag_urn])
linked_resource_set.add(tag_urn)
tag_entities.SnowflakeTag(
datahub_urns=linked_resource_set,
managed_by_datahub=False,
id=snowflake_tag_id,
allowed_values=None,
)
logger.info(
f"Snowflake tag {tag.display_name()} not found in platform_resource_repository. Creating new one..."
)
else:
try:
ret = snowflake_tag.datahub_linked_resources().add(tag_urn)
except ValueError as e:
logger.error(f"Failed to add tag {tag.display_name()}. Error: {e}")
return
if ret:
logger.info(
f"Snowflake tag {tag.display_name()} added in platform_resource_repository"
)
else:
logger.info(
f"Snowflake tag {tag.display_name()} already exists in platform_resource_repository"
)
return
if not snowflake_tag:
return

platform_resource = snowflake_tag.as_platform_resource()

yield MetadataChangeProposalWrapper(
entityUrn=tag_urn, aspect=tag_properties_aspect
).as_workunit()

for mcp in platform_resource.to_mcps():
yield mcp.as_workunit()

self.platform_resource_repository.cache[
platform_resource.id
] = platform_resource

def gen_schema_metadata(
self,
table: Union[SnowflakeTable, SnowflakeView],
Expand Down Expand Up @@ -835,7 +906,7 @@ def gen_schema_metadata(
[
TagAssociation(
make_tag_urn(
self.snowflake_identifier(tag.identifier())
self.snowflake_identifier(tag.display_name())
)
)
for tag in table.column_tags[col.name]
Expand Down Expand Up @@ -928,7 +999,7 @@ def gen_database_containers(
)
),
tags=(
[self.snowflake_identifier(tag.identifier()) for tag in database.tags]
[self.snowflake_identifier(tag.display_name()) for tag in database.tags]
if database.tags
else None
),
Expand Down Expand Up @@ -981,7 +1052,7 @@ def gen_schema_containers(
else None
),
tags=(
[self.snowflake_identifier(tag.identifier()) for tag in schema.tags]
[self.snowflake_identifier(tag.display_name()) for tag in schema.tags]
if schema.tags
else None
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def _filter_tags(

allowed_tags = []
for tag in tags:
tag_identifier = tag.identifier()
tag_identifier = tag.display_name()
self.report.report_entity_scanned(tag_identifier, "tag")
if not self.config.tag_pattern.allowed(tag_identifier):
self.report.report_dropped(tag_identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Union

from datahub.api.entities.external.external_entities import PlatformResourceRepository
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand Down Expand Up @@ -35,7 +36,10 @@
from datahub.ingestion.source.snowflake.snowflake_assertion import (
SnowflakeAssertionsHandler,
)
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
TagOption,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnection,
SnowflakeConnectionConfig,
Expand Down Expand Up @@ -161,6 +165,12 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
self.aggregator: Optional[SqlParsingAggregator] = None
self.platform_resource_repository = None

if self.ctx.graph and self.config.extract_tags != TagOption.skip:
self.platform_resource_repository = PlatformResourceRepository(
self.ctx.graph
)

if self.config.use_queries_v2 or self.config.include_table_lineage:
self.aggregator = self._exit_stack.enter_context(
Expand Down Expand Up @@ -476,6 +486,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
snowsight_url_builder=snowsight_url_builder,
filters=self.filters,
identifiers=self.identifiers,
platform_resource_repository=self.platform_resource_repository,
)

self.report.set_ingestion_stage("*", METADATA_EXTRACTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"com.linkedin.pegasus2avro.common.InstitutionalMemory": {
"elements": [
{
"url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/examples/",
"url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/tests/",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
Expand Down Expand Up @@ -71,7 +71,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00"
"runId": "openapi-2020_04_14-07_00_00-68bgeu",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -95,7 +96,7 @@
"com.linkedin.pegasus2avro.common.InstitutionalMemory": {
"elements": [
{
"url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/examples/v2",
"url": "https://raw.githubusercontent.com/OAI/OpenAPI-Specification/main/tests/v2",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
Expand Down Expand Up @@ -146,7 +147,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00"
"runId": "openapi-2020_04_14-07_00_00-68bgeu",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -161,7 +163,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00"
"runId": "openapi-2020_04_14-07_00_00-68bgeu",
"lastRunId": "no-run-id-provided"
}
},
{
Expand All @@ -176,7 +179,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00"
"runId": "openapi-2020_04_14-07_00_00-68bgeu",
"lastRunId": "no-run-id-provided"
}
}
]

0 comments on commit 1397858

Please sign in to comment.