-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/sql-queries): Add sql queries source, SqlParsingBuilder, sqlglot_lineage performance optimizations #8494
feat(ingest/sql-queries): Add sql queries source, SqlParsingBuilder, sqlglot_lineage performance optimizations #8494
Conversation
…r. Add some sqlglot_lineage performance optimizations
if i % 1000 == 0: | ||
logger.debug(f"Loaded {i} schema metadata") | ||
try: | ||
schema_metadata = self.get_aspect(urn, SchemaMetadataClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really need a bulk endpoint here. Loading 45k aspects took over an hour
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use the graphql endpoints to bulk fetch schema metadata
We also do have bulk endpoints somewhere, but not sure the exact syntax
@config_class(SqlQueriesSourceConfig) | ||
@support_status(SupportStatus.TESTING) | ||
class SqlQueriesSource(Source): | ||
# TODO: Documentation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to get done before I merge. Want to specify what this does and what format the query file is expected to be in
@@ -238,6 +253,7 @@ def __init__( | |||
self.platform = platform | |||
self.platform_instance = platform_instance | |||
self.env = env | |||
self.urns: Optional[Set[str]] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization 1: don't fetch schema metadata for urns that don't exist, which I expect could be a lot e.g. when we filter out urns. This doesn't apply if we're filtering out temporary tables like with bigquery
schema_info = self._resolve_schema_info(urn) | ||
if schema_info: | ||
return urn, schema_info | ||
if not (self.urns and urn not in self.urns): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like this logic. Was thinking about creating a "top" set that overrides __contains__
to always return True and setting that as the default instead of None
, but could see how that could be confusing. Thoughts? This type of logic comes up in the sql parsing builder too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above - we should be able to remove this
@@ -755,6 +778,7 @@ def _sqlglot_lineage_inner( | |||
) | |||
|
|||
|
|||
@functools.lru_cache(maxsize=1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization 2: saw lots of duplicate queries, think this is an easy performance gain for what shouldn't be too much memory. Can lower the # if we think SqlParsingResults can be very large, or use a file backed dict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup this makes sense - let's make the 1000 a constant but otherwise is fine
) | ||
|
||
schema_resolver = self._make_schema_resolver(platform, platform_instance, env) | ||
schema_resolver.set_include_urns(urns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're doing all the schema resolution here, you don't need to pass a DataHubGraph instance into the SchemaResolver
Once you do that, we can remove this set_include_urns thing
if i % 1000 == 0: | ||
logger.debug(f"Loaded {i} schema metadata") | ||
try: | ||
schema_metadata = self.get_aspect(urn, SchemaMetadataClass) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use the graphql endpoints to bulk fetch schema metadata
We also do have bulk endpoints somewhere, but not sure the exact syntax
if schema_resolver.platform == "presto-on-hive": | ||
dialect = "hive" | ||
else: | ||
dialect = schema_resolver.platform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's extract this into a helper method - I suspect it'll grow / need to be monkeypatched in the future
@@ -755,6 +778,7 @@ def _sqlglot_lineage_inner( | |||
) | |||
|
|||
|
|||
@functools.lru_cache(maxsize=1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup this makes sense - let's make the 1000 a constant but otherwise is fine
schema_info = self._resolve_schema_info(urn) | ||
if schema_info: | ||
return urn, schema_info | ||
if not (self.urns and urn not in self.urns): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above - we should be able to remove this
platform: str = Field( | ||
description="The platform for which to generate data, e.g. snowflake" | ||
) | ||
dialect: str = Field(description="The SQL dialect of the queries, e.g. snowflake") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need both of these as required fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove dialect
Deployment failed with the following error:
|
8279981
to
4d60b70
Compare
urns = set( | ||
self.get_urns_by_filter( | ||
entity_types=[DatasetUrn.ENTITY_TYPE], | ||
platform=platform, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a bug here - it doesn't respect platform_instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it won't actually break anything right -- just add more items to schema resolver than necessary. Do we have the ability to filter on platform instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep - I'm adding that here #8709
Merging as I don't think failures are related, sorry if that's not the case |
Creates a source for generating lineage and usage (stats + operations) from a file containing a list of queries. Uses SqlParsingBuilder which hopefully can be reused for other sources. Adds 2 sqlglot_lineage performance optimizations.
cc @mayurinehate for awareness
Checklist