Skip to content

Commit

Permalink
Adds config_change_callback to Destinations and Sources
Browse files Browse the repository at this point in the history
  • Loading branch information
tcboles committed Nov 4, 2024
1 parent 7c703ac commit abe154e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 1 deletion.
15 changes: 14 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any], int], None] | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
"""
self.config_change_callback = config_change_callback
self.executor = executor
self._name = name
self._config_dict: dict[str, Any] | None = None
Expand Down Expand Up @@ -361,7 +363,8 @@ def _peek_airbyte_message(
This method handles reading Airbyte messages and taking action, if needed, based on the
message type. For instance, log messages are logged, records are tallied, and errors are
raised as exceptions if `raise_on_error` is True.
raised as exceptions if `raise_on_error` is True. If a config change message is received,
the config change callback is called.
Raises:
AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
Expand All @@ -380,6 +383,16 @@ def _peek_airbyte_message(
)
return

if (
message.type == "CONTROL"
and message.control.type == "CONNECTOR_CONFIG"
and self.config_change_callback is not None
):
self.config_change_callback(
message.control.config, message.control.emitted_at
)
return

def _execute(
self,
args: list[str],
Expand Down
2 changes: 2 additions & 0 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
*,
validate: bool = False,
) -> None:
Expand All @@ -59,6 +60,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)

Expand Down
2 changes: 2 additions & 0 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: Callable[[dict[str, Any]], None] | None = None,
streams: str | list[str] | None = None,
*,
validate: bool = False,
Expand All @@ -73,6 +74,7 @@ def __init__(
executor=executor,
name=name,
config=config,
config_change_callback=config_change_callback,
validate=validate,
)
self._config_dict: dict[str, Any] | None = None
Expand Down

0 comments on commit abe154e

Please sign in to comment.