-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/datahub): Add way to filter soft deleted entities #11738
feat(ingest/datahub): Add way to filter soft deleted entities #11738
Conversation
Add way to filter aspects
exclude_aspects: Set[str] = Field( | ||
default_factory=set, | ||
description="Set of aspect names to exclude from ingestion", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume we use a set here instead of an AllowDenyPattern so that we can push down the filters to sql?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly
metadata-ingestion/src/datahub/ingestion/source/datahub/config.py
Outdated
Show resolved
Hide resolved
{"" if self.config.include_all_versions else "AND mav.version = 0"} | ||
{"" if not self.config.exclude_aspects else f"AND LOWER(mav.aspect) NOT IN %(exclude_aspects)s"} | ||
AND mav.createdon >= %(since_createdon)s | ||
ORDER BY createdon, urn, aspect, version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we indent the subquery here - otherwise the sql is hard to read
also - I like how we format sql queries in the fivetran source
datahub/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py
Line 134 in a11ac8d
FROM {self.db_clause}column_lineage as cl |
AND mav.createdon >= %(since_createdon)s | ||
ORDER BY createdon, urn, aspect, version | ||
) as t | ||
where row_id > %(last_id)s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we doing manual pagination instead of just using db cursors?
@@ -92,6 +92,23 @@ def __init__( | |||
**connection_config.options, | |||
) | |||
|
|||
@property | |||
def soft_deleted_urns(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def soft_deleted_urns(self) -> str: | |
def soft_deleted_urns_query(self) -> str: |
"Cannot exclude soft deleted entities without a database connection" | ||
) | ||
soft_deleted_urns = [ | ||
row[0] for row in database_reader.get_soft_deleted_rows() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this read the soft-deleted urns list twice?
metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_kafka_reader.py
Outdated
Show resolved
Hide resolved
Merging as the error is unrelated |
Add a way to filter soft-deleted entities
Add a way to filter aspects
Fix SQL query pagination to not have duplicate entries
Checklist