diff --git a/airbyte/source.py b/airbyte/source.py index 67370d61..e9f5fbd4 100644 --- a/airbyte/source.py +++ b/airbyte/source.py @@ -524,6 +524,7 @@ def read( self, cache: SQLCacheBase | None = None, *, + streams: str | list[str] | None = None, write_strategy: str | WriteStrategy = WriteStrategy.AUTO, force_full_refresh: bool = False, ) -> ReadResult: @@ -535,6 +536,8 @@ def read( one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or WriteStrategy.AUTO. + streams: Optional if already set. A list of stream names to select for reading. If set + to "*", all streams will be selected. force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, streams will be read in incremental mode if supported by the connector. This option must be True when using the "replace" strategy. @@ -562,6 +565,9 @@ def read( }, ) from None + if streams: + self.select_streams(streams) + if not self._selected_stream_names: raise exc.AirbyteLibNoStreamsSelectedError( connector_name=self.name,