Skip to content

Commit

Permalink
feat(ingest/fivetran): support filtering on destination ids (datahub-…
Browse files Browse the repository at this point in the history
  • Loading branch information
matthew-coudert-cko authored Aug 30, 2024
1 parent 3e5c18f commit c513e17
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
default=AllowDenyPattern.allow_all(),
description="Regex patterns for connectors to filter in ingestion.",
)
destination_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for destinations to filter in ingestion.",
)
include_column_lineage: bool = Field(
default=True,
description="Populates table->table column lineage.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Fivetran plugin execution is started")
connectors = self.audit_log.get_allowed_connectors_list(
self.config.connector_patterns,
self.config.destination_patterns,
self.report,
self.config.history_sync_lookback_period,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def _fill_connectors_jobs(
def get_allowed_connectors_list(
self,
connector_patterns: AllowDenyPattern,
destination_patterns: AllowDenyPattern,
report: FivetranSourceReport,
syncs_interval: int,
) -> List[Connector]:
Expand All @@ -261,6 +262,9 @@ def get_allowed_connectors_list(
if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]):
report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME])
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/tests/integration/fivetran/test_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path):
"postgres",
]
},
"destination_patterns": {
"allow": [
"interval_unconstitutional",
]
},
"sources_to_database": {
"calendar_elected": "postgres_db",
},
Expand Down Expand Up @@ -291,6 +296,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_
"postgres",
]
},
"destination_patterns": {
"allow": [
"interval_unconstitutional",
]
},
"sources_to_database": {
"calendar_elected": "postgres_db",
},
Expand Down

0 comments on commit c513e17

Please sign in to comment.