From abe154ee6daed97892d367ebfdf46618b3948c5c Mon Sep 17 00:00:00 2001 From: Thomas Boles Date: Mon, 4 Nov 2024 14:35:46 -0500 Subject: [PATCH] Adds config_change_callback to Destinations and Sources --- airbyte/_connector_base.py | 15 ++++++++++++++- airbyte/destinations/base.py | 2 ++ airbyte/sources/base.py | 2 ++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 8d7dbc00..a09c4ed1 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -56,6 +56,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any], int], None] | None = None, *, validate: bool = False, ) -> None: @@ -63,6 +64,7 @@ def __init__( If config is provided, it will be validated against the spec if validate is True. """ + self.config_change_callback = config_change_callback self.executor = executor self._name = name self._config_dict: dict[str, Any] | None = None @@ -361,7 +363,8 @@ def _peek_airbyte_message( This method handles reading Airbyte messages and taking action, if needed, based on the message type. For instance, log messages are logged, records are tallied, and errors are - raised as exceptions if `raise_on_error` is True. + raised as exceptions if `raise_on_error` is True. If a config change message is received, + the config change callback is called. Raises: AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted. @@ -380,6 +383,16 @@ def _peek_airbyte_message( ) return + if ( + message.type == "CONTROL" + and message.control.type == "CONNECTOR_CONFIG" + and self.config_change_callback is not None + ): + self.config_change_callback( + message.control.config, message.control.emitted_at + ) + return + def _execute( self, args: list[str], diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 1b34d40f..d8899fcb 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -48,6 +48,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, *, validate: bool = False, ) -> None: @@ -59,6 +60,7 @@ def __init__( executor=executor, name=name, config=config, + config_change_callback=config_change_callback, validate=validate, ) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index aeffc8ed..cca48b4b 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -58,6 +58,7 @@ def __init__( executor: Executor, name: str, config: dict[str, Any] | None = None, + config_change_callback: Callable[[dict[str, Any]], None] | None = None, streams: str | list[str] | None = None, *, validate: bool = False, @@ -73,6 +74,7 @@ def __init__( executor=executor, name=name, config=config, + config_change_callback=config_change_callback, validate=validate, ) self._config_dict: dict[str, Any] | None = None