From c513e17dbb6f54ec1a3e835c94e479e64d4662a9 Mon Sep 17 00:00:00 2001 From: matthew-coudert-cko <113023884+matthew-coudert-cko@users.noreply.github.com> Date: Fri, 30 Aug 2024 21:25:18 +0100 Subject: [PATCH] feat(ingest/fivetran): support filtering on destination ids (#11277) --- .../src/datahub/ingestion/source/fivetran/config.py | 4 ++++ .../src/datahub/ingestion/source/fivetran/fivetran.py | 1 + .../ingestion/source/fivetran/fivetran_log_api.py | 4 ++++ .../tests/integration/fivetran/test_fivetran.py | 10 ++++++++++ 4 files changed, 19 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index f8b1c6dd93d6d..02eb096b240f5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -161,6 +161,10 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin default=AllowDenyPattern.allow_all(), description="Regex patterns for connectors to filter in ingestion.", ) + destination_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for destinations to filter in ingestion.", + ) include_column_lineage: bool = Field( default=True, description="Populates table->table column lineage.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index c899fe04d2c48..b459b47deb153 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -283,6 +283,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: logger.info("Fivetran plugin execution is started") connectors = self.audit_log.get_allowed_connectors_list( self.config.connector_patterns, + self.config.destination_patterns, self.report, self.config.history_sync_lookback_period, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index d8ce68e8345ec..31c16139066e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -251,6 +251,7 @@ def _fill_connectors_jobs( def get_allowed_connectors_list( self, connector_patterns: AllowDenyPattern, + destination_patterns: AllowDenyPattern, report: FivetranSourceReport, syncs_interval: int, ) -> List[Connector]: @@ -261,6 +262,9 @@ def get_allowed_connectors_list( if not connector_patterns.allowed(connector[Constant.CONNECTOR_NAME]): report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) continue + if not destination_patterns.allowed(connector[Constant.DESTINATION_ID]): + report.report_connectors_dropped(connector[Constant.CONNECTOR_NAME]) + continue connectors.append( Connector( connector_id=connector[Constant.CONNECTOR_ID], diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 5e0e20234cc99..0f5d098ee39c4 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -205,6 +205,11 @@ def test_fivetran_with_snowflake_dest(pytestconfig, tmp_path): "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", }, @@ -291,6 +296,11 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ "postgres", ] }, + "destination_patterns": { + "allow": [ + "interval_unconstitutional", + ] + }, "sources_to_database": { "calendar_elected": "postgres_db", },