-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/sql-queries): Add sql queries source, SqlParsingBuilder, sqlglot_lineage performance optimizations #8494
Changes from all commits
03419b4
4a32f1e
f10aee0
4cf2cdf
ebbcfd7
d975fe4
4d60b70
d96378b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,289 @@ | ||
import logging | ||
import time | ||
from collections import defaultdict | ||
from dataclasses import dataclass, field | ||
from datetime import datetime | ||
from typing import Collection, Dict, Iterable, List, Optional, Set | ||
|
||
from datahub.emitter.mce_builder import make_schema_field_urn | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator | ||
from datahub.metadata.schema_classes import ( | ||
AuditStampClass, | ||
DatasetLineageTypeClass, | ||
FineGrainedLineageClass, | ||
FineGrainedLineageDownstreamTypeClass, | ||
FineGrainedLineageUpstreamTypeClass, | ||
OperationClass, | ||
OperationTypeClass, | ||
UpstreamClass, | ||
UpstreamLineageClass, | ||
) | ||
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# TODO: Use this over other sources' equivalent code, if possible | ||
|
||
DatasetUrn = str | ||
FieldUrn = str | ||
UserUrn = str | ||
|
||
|
||
@dataclass | ||
class LineageEdge: | ||
"""Stores information about a single lineage edge, from an upstream table to a downstream table.""" | ||
|
||
downstream_urn: DatasetUrn | ||
upstream_urn: DatasetUrn | ||
audit_stamp: Optional[datetime] | ||
actor: Optional[UserUrn] | ||
type: str = DatasetLineageTypeClass.TRANSFORMED | ||
|
||
# Maps downstream_col -> {upstream_col} | ||
column_map: Dict[str, Set[str]] = field(default_factory=lambda: defaultdict(set)) | ||
|
||
def gen_upstream_aspect(self) -> UpstreamClass: | ||
return UpstreamClass( | ||
auditStamp=AuditStampClass( | ||
time=int(self.audit_stamp.timestamp() * 1000), actor=self.actor or "" | ||
) | ||
if self.audit_stamp | ||
else None, | ||
dataset=self.upstream_urn, | ||
type=self.type, | ||
) | ||
|
||
def gen_fine_grained_lineage_aspects(self) -> Iterable[FineGrainedLineageClass]: | ||
for downstream_col, upstream_cols in self.column_map.items(): | ||
yield FineGrainedLineageClass( | ||
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET, | ||
# Sort to avoid creating multiple aspects in backend with same lineage but different order | ||
upstreams=sorted( | ||
make_schema_field_urn(self.upstream_urn, col) | ||
for col in upstream_cols | ||
), | ||
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD, | ||
downstreams=[ | ||
make_schema_field_urn(self.downstream_urn, downstream_col) | ||
], | ||
) | ||
|
||
|
||
@dataclass | ||
class SqlParsingBuilder: | ||
# Open question: does it make sense to iterate over out_tables? When will we have multiple? | ||
|
||
generate_lineage: bool = True | ||
generate_usage_statistics: bool = True | ||
generate_operations: bool = True | ||
usage_config: Optional[BaseUsageConfig] = None | ||
|
||
# TODO: Make inner dict a FileBackedDict and make LineageEdge frozen | ||
# Builds up a single LineageEdge for each upstream -> downstream pair | ||
_lineage_map: Dict[DatasetUrn, Dict[DatasetUrn, LineageEdge]] = field( | ||
default_factory=lambda: defaultdict(dict), init=False | ||
) | ||
|
||
# TODO: Replace with FileBackedDict approach like in BigQuery usage | ||
_usage_aggregator: UsageAggregator[DatasetUrn] = field(init=False) | ||
|
||
def __post_init__(self) -> None: | ||
if self.usage_config: | ||
self._usage_aggregator = UsageAggregator(self.usage_config) | ||
else: | ||
logger.info("No usage config provided, not generating usage statistics") | ||
self.generate_usage_statistics = False | ||
|
||
def process_sql_parsing_result( | ||
self, | ||
result: SqlParsingResult, | ||
*, | ||
query: str, | ||
query_timestamp: Optional[datetime] = None, | ||
is_view_ddl: bool = False, | ||
user: Optional[UserUrn] = None, | ||
custom_operation_type: Optional[str] = None, | ||
include_urns: Optional[Set[DatasetUrn]] = None, | ||
) -> Iterable[MetadataWorkUnit]: | ||
"""Process a single query and yield any generated workunits. | ||
|
||
Args: | ||
result: The result of parsing the query, or a mock result if parsing failed. | ||
query: The SQL query to parse and process. | ||
query_timestamp: When the query was run. | ||
is_view_ddl: Whether the query is a DDL statement that creates a view. | ||
user: The urn of the user who ran the query. | ||
custom_operation_type: Platform-specific operation type, used if the operation type can't be parsed. | ||
include_urns: If provided, only generate workunits for these urns. | ||
""" | ||
downstreams_to_ingest = result.out_tables | ||
upstreams_to_ingest = result.in_tables | ||
if include_urns: | ||
logger.debug(f"Skipping urns {set(downstreams_to_ingest) - include_urns}") | ||
downstreams_to_ingest = list(set(downstreams_to_ingest) & include_urns) | ||
upstreams_to_ingest = list(set(upstreams_to_ingest) & include_urns) | ||
|
||
if self.generate_lineage: | ||
for downstream_urn in downstreams_to_ingest: | ||
_merge_lineage_data( | ||
downstream_urn=downstream_urn, | ||
upstream_urns=result.in_tables, | ||
column_lineage=result.column_lineage, | ||
upstream_edges=self._lineage_map[downstream_urn], | ||
query_timestamp=query_timestamp, | ||
is_view_ddl=is_view_ddl, | ||
user=user, | ||
) | ||
|
||
if self.generate_usage_statistics and query_timestamp is not None: | ||
upstream_fields = _compute_upstream_fields(result) | ||
for upstream_urn in upstreams_to_ingest: | ||
self._usage_aggregator.aggregate_event( | ||
resource=upstream_urn, | ||
start_time=query_timestamp, | ||
query=query, | ||
user=user, | ||
fields=sorted(upstream_fields.get(upstream_urn, [])), | ||
) | ||
|
||
if self.generate_operations and query_timestamp is not None: | ||
for downstream_urn in downstreams_to_ingest: | ||
yield from _gen_operation_workunit( | ||
result, | ||
downstream_urn=downstream_urn, | ||
query_timestamp=query_timestamp, | ||
user=user, | ||
custom_operation_type=custom_operation_type, | ||
) | ||
|
||
def add_lineage( | ||
self, | ||
downstream_urn: DatasetUrn, | ||
upstream_urns: Collection[DatasetUrn], | ||
timestamp: Optional[datetime] = None, | ||
is_view_ddl: bool = False, | ||
user: Optional[UserUrn] = None, | ||
) -> None: | ||
"""Manually add a single upstream -> downstream lineage edge, e.g. if sql parsing fails.""" | ||
_merge_lineage_data( | ||
downstream_urn=downstream_urn, | ||
upstream_urns=upstream_urns, | ||
column_lineage=None, | ||
upstream_edges=self._lineage_map[downstream_urn], | ||
query_timestamp=timestamp, | ||
is_view_ddl=is_view_ddl, | ||
user=user, | ||
) | ||
|
||
def gen_workunits(self) -> Iterable[MetadataWorkUnit]: | ||
if self.generate_lineage: | ||
yield from self._gen_lineage_workunits() | ||
if self.generate_usage_statistics: | ||
yield from self._gen_usage_statistics_workunits() | ||
|
||
def _gen_lineage_workunits(self) -> Iterable[MetadataWorkUnit]: | ||
for downstream_urn in self._lineage_map: | ||
upstreams: List[UpstreamClass] = [] | ||
fine_upstreams: List[FineGrainedLineageClass] = [] | ||
for upstream_urn, edge in self._lineage_map[downstream_urn].items(): | ||
upstreams.append(edge.gen_upstream_aspect()) | ||
fine_upstreams.extend(edge.gen_fine_grained_lineage_aspects()) | ||
|
||
upstream_lineage = UpstreamLineageClass( | ||
upstreams=sorted(upstreams, key=lambda x: x.dataset), | ||
fineGrainedLineages=sorted( | ||
fine_upstreams, | ||
key=lambda x: (x.downstreams, x.upstreams), | ||
) | ||
or None, | ||
) | ||
yield MetadataChangeProposalWrapper( | ||
entityUrn=downstream_urn, aspect=upstream_lineage | ||
).as_workunit() | ||
|
||
def _gen_usage_statistics_workunits(self) -> Iterable[MetadataWorkUnit]: | ||
yield from self._usage_aggregator.generate_workunits( | ||
resource_urn_builder=lambda urn: urn, user_urn_builder=lambda urn: urn | ||
) | ||
|
||
|
||
def _merge_lineage_data( | ||
downstream_urn: DatasetUrn, | ||
*, | ||
upstream_urns: Collection[DatasetUrn], | ||
column_lineage: Optional[List[ColumnLineageInfo]], | ||
upstream_edges: Dict[DatasetUrn, LineageEdge], | ||
query_timestamp: Optional[datetime], | ||
is_view_ddl: bool, | ||
user: Optional[UserUrn], | ||
) -> None: | ||
for upstream_urn in upstream_urns: | ||
edge = upstream_edges.setdefault( | ||
upstream_urn, | ||
LineageEdge( | ||
downstream_urn=downstream_urn, | ||
upstream_urn=upstream_urn, | ||
audit_stamp=query_timestamp, | ||
actor=user, | ||
type=DatasetLineageTypeClass.VIEW | ||
if is_view_ddl | ||
else DatasetLineageTypeClass.TRANSFORMED, | ||
), | ||
) | ||
if query_timestamp and ( # Use the most recent query | ||
edge.audit_stamp is None or query_timestamp > edge.audit_stamp | ||
): | ||
edge.audit_stamp = query_timestamp | ||
if user: | ||
edge.actor = user | ||
|
||
# Note: Inefficient as we loop through all column_lineage entries for each downstream table | ||
for cl in column_lineage or []: | ||
if cl.downstream.table == downstream_urn: | ||
for upstream_column_info in cl.upstreams: | ||
if upstream_column_info.table not in upstream_urns: | ||
continue | ||
column_map = upstream_edges[upstream_column_info.table].column_map | ||
column_map[cl.downstream.column].add(upstream_column_info.column) | ||
|
||
|
||
def _compute_upstream_fields( | ||
result: SqlParsingResult, | ||
) -> Dict[DatasetUrn, Set[DatasetUrn]]: | ||
upstream_fields: Dict[DatasetUrn, Set[DatasetUrn]] = defaultdict(set) | ||
for cl in result.column_lineage or []: | ||
for upstream in cl.upstreams: | ||
upstream_fields[upstream.table].add(upstream.column) | ||
return upstream_fields | ||
|
||
|
||
def _gen_operation_workunit( | ||
result: SqlParsingResult, | ||
*, | ||
downstream_urn: DatasetUrn, | ||
query_timestamp: datetime, | ||
user: Optional[UserUrn], | ||
custom_operation_type: Optional[str], | ||
) -> Iterable[MetadataWorkUnit]: | ||
operation_type = result.query_type.to_operation_type() | ||
# Filter out SELECT and other undesired statements | ||
if operation_type is None: | ||
return | ||
elif operation_type == OperationTypeClass.UNKNOWN: | ||
if custom_operation_type is None: | ||
return | ||
else: | ||
operation_type = OperationTypeClass.CUSTOM | ||
|
||
aspect = OperationClass( | ||
timestampMillis=int(time.time() * 1000), | ||
operationType=operation_type, | ||
lastUpdatedTimestamp=int(query_timestamp.timestamp() * 1000), | ||
actor=user, | ||
customOperationType=custom_operation_type, | ||
) | ||
yield MetadataChangeProposalWrapper( | ||
entityUrn=downstream_urn, aspect=aspect | ||
).as_workunit() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
from dataclasses import dataclass | ||
from datetime import datetime | ||
from json.decoder import JSONDecodeError | ||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type | ||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type | ||
|
||
from avro.schema import RecordSchema | ||
from deprecated import deprecated | ||
|
@@ -38,6 +38,8 @@ | |
SystemMetadataClass, | ||
TelemetryClientIdClass, | ||
) | ||
from datahub.utilities.perf_timer import PerfTimer | ||
from datahub.utilities.urns.dataset_urn import DatasetUrn | ||
from datahub.utilities.urns.urn import Urn, guess_entity_type | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -957,16 +959,62 @@ def delete_references_to_urn( | |
|
||
@functools.lru_cache() | ||
def _make_schema_resolver( | ||
self, platform: str, platform_instance: Optional[str], env: str | ||
self, | ||
platform: str, | ||
platform_instance: Optional[str], | ||
env: str, | ||
include_graph: bool = True, | ||
) -> "SchemaResolver": | ||
from datahub.utilities.sqlglot_lineage import SchemaResolver | ||
|
||
return SchemaResolver( | ||
platform=platform, | ||
platform_instance=platform_instance, | ||
env=env, | ||
graph=self, | ||
graph=self if include_graph else None, | ||
) | ||
|
||
def initialize_schema_resolver_from_datahub( | ||
self, platform: str, platform_instance: Optional[str], env: str | ||
) -> Tuple["SchemaResolver", Set[str]]: | ||
logger.info("Initializing schema resolver") | ||
|
||
# TODO: Filter on platform instance? | ||
logger.info(f"Fetching urns for platform {platform}, env {env}") | ||
with PerfTimer() as timer: | ||
urns = set( | ||
self.get_urns_by_filter( | ||
entity_types=[DatasetUrn.ENTITY_TYPE], | ||
platform=platform, | ||
env=env, | ||
batch_size=3000, | ||
) | ||
) | ||
logger.info( | ||
f"Fetched {len(urns)} urns in {timer.elapsed_seconds()} seconds" | ||
) | ||
|
||
schema_resolver = self._make_schema_resolver( | ||
platform, platform_instance, env, include_graph=False | ||
) | ||
with PerfTimer() as timer: | ||
count = 0 | ||
for i, urn in enumerate(urns): | ||
if i % 1000 == 0: | ||
logger.debug(f"Loaded {i} schema metadata") | ||
try: | ||
schema_metadata = self.get_aspect(urn, SchemaMetadataClass) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We really need a bulk endpoint here. Loading 45k aspects took over an hour There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can use the graphql endpoints to bulk fetch schema metadata We also do have bulk endpoints somewhere, but not sure the exact syntax |
||
if schema_metadata: | ||
schema_resolver.add_schema_metadata(urn, schema_metadata) | ||
count += 1 | ||
except Exception: | ||
logger.warning("Failed to load schema metadata", exc_info=True) | ||
logger.info( | ||
f"Loaded {count} schema metadata in {timer.elapsed_seconds()} seconds" | ||
) | ||
|
||
logger.info("Finished initializing schema resolver") | ||
return schema_resolver, urns | ||
|
||
def parse_sql_lineage( | ||
self, | ||
|
@@ -982,9 +1030,7 @@ def parse_sql_lineage( | |
|
||
# Cache the schema resolver to make bulk parsing faster. | ||
schema_resolver = self._make_schema_resolver( | ||
platform=platform, | ||
platform_instance=platform_instance, | ||
env=env, | ||
platform=platform, platform_instance=platform_instance, env=env | ||
) | ||
|
||
return sqlglot_lineage( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a bug here - it doesn't respect platform_instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it won't actually break anything right -- just add more items to schema resolver than necessary. Do we have the ability to filter on platform instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - I'm adding that here #8709