Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/sql-queries): Add sql queries source, SqlParsingBuilder, sqlglot_lineage performance optimizations #8494

Merged
merged 8 commits into from
Aug 24, 2023
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def get_long_description():
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"sqlalchemy": sql_common,
"sql-queries": usage_common | sqlglot_lib,
"superset": {
"requests",
"sqlalchemy",
Expand Down Expand Up @@ -588,6 +589,7 @@ def get_long_description():
"demo-data = datahub.ingestion.source.demo_data.DemoDataSource",
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
],
"datahub.ingestion.transformer.plugins": [
"simple_remove_dataset_ownership = datahub.ingestion.transformer.remove_dataset_ownership:SimpleRemoveDatasetOwnership",
Expand Down
289 changes: 289 additions & 0 deletions metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import Collection, Dict, Iterable, List, Optional, Set

from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetLineageTypeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
OperationClass,
OperationTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult

logger = logging.getLogger(__name__)

# TODO: Use this over other sources' equivalent code, if possible

DatasetUrn = str
FieldUrn = str
UserUrn = str


@dataclass
class LineageEdge:
"""Stores information about a single lineage edge, from an upstream table to a downstream table."""

downstream_urn: DatasetUrn
upstream_urn: DatasetUrn
audit_stamp: Optional[datetime]
actor: Optional[UserUrn]
type: str = DatasetLineageTypeClass.TRANSFORMED

# Maps downstream_col -> {upstream_col}
column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set))

def gen_upstream_aspect(self) -> UpstreamClass:
return UpstreamClass(
auditStamp=AuditStampClass(
time=int(self.audit_stamp.timestamp() * 1000), actor=self.actor or ""
)
if self.audit_stamp
else None,
dataset=self.upstream_urn,
type=self.type,
)

def gen_fine_grained_lineage_aspects(self) -> Iterable[FineGrainedLineageClass]:
for downstream_col, upstream_cols in self.column_map.items():
yield FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
# Sort to avoid creating multiple aspects in backend with same lineage but different order
upstreams=sorted(
make_schema_field_urn(self.upstream_urn, col)
for col in upstream_cols
),
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
make_schema_field_urn(self.downstream_urn, downstream_col)
],
)


@dataclass
class SqlParsingBuilder:
# Open question: does it make sense to iterate over out_tables? When will we have multiple?

generate_lineage: bool = True
generate_usage_statistics: bool = True
generate_operations: bool = True
usage_config: Optional[BaseUsageConfig] = None

# TODO: Make inner dict a FileBackedDict and make LineageEdge frozen
# Builds up a single LineageEdge for each upstream -> downstream pair
_lineage_map: Dict[DatasetUrn, Dict[DatasetUrn, LineageEdge]] = field(
default_factory=lambda: defaultdict(dict), init=False
)

# TODO: Replace with FileBackedDict approach like in BigQuery usage
_usage_aggregator: UsageAggregator[DatasetUrn] = field(init=False)

def __post_init__(self) -> None:
if self.usage_config:
self._usage_aggregator = UsageAggregator(self.usage_config)
else:
logger.info("No usage config provided, not generating usage statistics")
self.generate_usage_statistics = False

def process_sql_parsing_result(
self,
result: SqlParsingResult,
*,
query: str,
query_timestamp: Optional[datetime] = None,
is_view_ddl: bool = False,
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.

Args:
result: The result of parsing the query, or a mock result if parsing failed.
query: The SQL query to parse and process.
query_timestamp: When the query was run.
is_view_ddl: Whether the query is a DDL statement that creates a view.
user: The urn of the user who ran the query.
custom_operation_type: Platform-specific operation type, used if the operation type can't be parsed.
include_urns: If provided, only generate workunits for these urns.
"""
downstreams_to_ingest = result.out_tables
upstreams_to_ingest = result.in_tables
if include_urns:
logger.debug(f"Skipping urns {set(downstreams_to_ingest) - include_urns}")
downstreams_to_ingest = list(set(downstreams_to_ingest) & include_urns)
upstreams_to_ingest = list(set(upstreams_to_ingest) & include_urns)

if self.generate_lineage:
for downstream_urn in downstreams_to_ingest:
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
user=user,
)

if self.generate_usage_statistics and query_timestamp is not None:
upstream_fields = _compute_upstream_fields(result)
for upstream_urn in upstreams_to_ingest:
self._usage_aggregator.aggregate_event(
resource=upstream_urn,
start_time=query_timestamp,
query=query,
user=user,
fields=sorted(upstream_fields.get(upstream_urn, [])),
)

if self.generate_operations and query_timestamp is not None:
for downstream_urn in downstreams_to_ingest:
yield from _gen_operation_workunit(
result,
downstream_urn=downstream_urn,
query_timestamp=query_timestamp,
user=user,
custom_operation_type=custom_operation_type,
)

def add_lineage(
self,
downstream_urn: DatasetUrn,
upstream_urns: Collection[DatasetUrn],
timestamp: Optional[datetime] = None,
is_view_ddl: bool = False,
user: Optional[UserUrn] = None,
) -> None:
"""Manually add a single upstream -> downstream lineage edge, e.g. if sql parsing fails."""
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=upstream_urns,
column_lineage=None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=timestamp,
is_view_ddl=is_view_ddl,
user=user,
)

def gen_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.generate_lineage:
yield from self._gen_lineage_workunits()
if self.generate_usage_statistics:
yield from self._gen_usage_statistics_workunits()

def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]:
for downstream_urn in self._lineage_map:
upstreams: List[UpstreamClass] = []
fine_upstreams: List[FineGrainedLineageClass] = []
for upstream_urn, edge in self._lineage_map[downstream_urn].items():
upstreams.append(edge.gen_upstream_aspect())
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects())

upstream_lineage = UpstreamLineageClass(
upstreams=sorted(upstreams, key=lambda x: x.dataset),
fineGrainedLineages=sorted(
fine_upstreams,
key=lambda x: (x.downstreams, x.upstreams),
)
or None,
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=upstream_lineage
).as_workunit()

def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]:
yield from self._usage_aggregator.generate_workunits(
resource_urn_builder=lambda urn: urn, user_urn_builder=lambda urn: urn
)


def _merge_lineage_data(
downstream_urn: DatasetUrn,
*,
upstream_urns: Collection[DatasetUrn],
column_lineage: Optional[List[ColumnLineageInfo]],
upstream_edges: Dict[DatasetUrn, LineageEdge],
query_timestamp: Optional[datetime],
is_view_ddl: bool,
user: Optional[UserUrn],
) -> None:
for upstream_urn in upstream_urns:
edge = upstream_edges.setdefault(
upstream_urn,
LineageEdge(
downstream_urn=downstream_urn,
upstream_urn=upstream_urn,
audit_stamp=query_timestamp,
actor=user,
type=DatasetLineageTypeClass.VIEW
if is_view_ddl
else DatasetLineageTypeClass.TRANSFORMED,
),
)
if query_timestamp and ( # Use the most recent query
edge.audit_stamp is None or query_timestamp > edge.audit_stamp
):
edge.audit_stamp = query_timestamp
if user:
edge.actor = user

# Note: Inefficient as we loop through all column_lineage entries for each downstream table
for cl in column_lineage or []:
if cl.downstream.table == downstream_urn:
for upstream_column_info in cl.upstreams:
if upstream_column_info.table not in upstream_urns:
continue
column_map = upstream_edges[upstream_column_info.table].column_map
column_map[cl.downstream.column].add(upstream_column_info.column)


def _compute_upstream_fields(
result: SqlParsingResult,
) -> Dict[DatasetUrn, Set[DatasetUrn]]:
upstream_fields: Dict[DatasetUrn, Set[DatasetUrn]] = defaultdict(set)
for cl in result.column_lineage or []:
for upstream in cl.upstreams:
upstream_fields[upstream.table].add(upstream.column)
return upstream_fields


def _gen_operation_workunit(
result: SqlParsingResult,
*,
downstream_urn: DatasetUrn,
query_timestamp: datetime,
user: Optional[UserUrn],
custom_operation_type: Optional[str],
) -> Iterable[MetadataWorkUnit]:
operation_type = result.query_type.to_operation_type()
# Filter out SELECT and other undesired statements
if operation_type is None:
return
elif operation_type == OperationTypeClass.UNKNOWN:
if custom_operation_type is None:
return
else:
operation_type = OperationTypeClass.CUSTOM

aspect = OperationClass(
timestampMillis=int(time.time() * 1000),
operationType=operation_type,
lastUpdatedTimestamp=int(query_timestamp.timestamp() * 1000),
actor=user,
customOperationType=custom_operation_type,
)
yield MetadataChangeProposalWrapper(
entityUrn=downstream_urn, aspect=aspect
).as_workunit()
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
"""A list of functions that transforms the workunits produced by this source.
Run in order, first in list is applied first. Be careful with order when overriding.

The last workunit processors should be the workunit reporter and
stale entity remover, if applicable.
"""
browse_path_processor: Optional[MetadataWorkUnitProcessor] = None
if (
Expand Down
51 changes: 47 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

from avro.schema import RecordSchema
from deprecated import deprecated
Expand Down Expand Up @@ -39,6 +39,8 @@
SystemMetadataClass,
TelemetryClientIdClass,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn, guess_entity_type

if TYPE_CHECKING:
Expand Down Expand Up @@ -970,6 +972,49 @@ def _make_schema_resolver(
graph=self,
)

def initialize_schema_resolver_from_datahub(
self, platform: str, platform_instance: Optional[str], env: str
) -> Tuple["SchemaResolver", Set[str]]:
logger.info("Initializing schema resolver")

# TODO: Filter on platform instance?
logger.info(f"Fetching urns for platform {platform}, env {env}")
with PerfTimer() as timer:
urns = set(
self.get_urns_by_filter(
entity_types=[DatasetUrn.ENTITY_TYPE],
platform=platform,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a bug here - it doesn't respect platform_instance

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it won't actually break anything right -- just add more items to schema resolver than necessary. Do we have the ability to filter on platform instance?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - I'm adding that here #8709

env=env,
batch_size=3000,
)
)
logger.info(
f"Fetched {len(urns)} urns in {timer.elapsed_seconds()} seconds"
)

schema_resolver = self._make_schema_resolver(platform, platform_instance, env)
schema_resolver.set_include_urns(urns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're doing all the schema resolution here, you don't need to pass a DataHubGraph instance into the SchemaResolver

Once you do that, we can remove this set_include_urns thing


with PerfTimer() as timer:
count = 0
for i, urn in enumerate(urns):
if i % 1000 == 0:
logger.debug(f"Loaded {i} schema metadata")
try:
schema_metadata = self.get_aspect(urn, SchemaMetadataClass)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need a bulk endpoint here. Loading 45k aspects took over an hour

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use the graphql endpoints to bulk fetch schema metadata

We also do have bulk endpoints somewhere, but not sure the exact syntax

if schema_metadata:
schema_resolver.add_schema_metadata(urn, schema_metadata)
count += 1
except Exception:
logger.warning("Failed to load schema metadata", exc_info=True)
logger.info(
f"Loaded {count} schema metadata in {timer.elapsed_seconds()} seconds"
)

logger.info("Finished initializing schema resolver")

return schema_resolver, urns

def parse_sql_lineage(
self,
sql: str,
Expand All @@ -984,9 +1029,7 @@ def parse_sql_lineage(

# Cache the schema resolver to make bulk parsing faster.
schema_resolver = self._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
platform=platform, platform_instance=platform_instance, env=env
)

return sqlglot_lineage(
Expand Down
Loading