diff --git a/airbyte/cli.py b/airbyte/cli.py index 4a7a56c0..51e345cf 100644 --- a/airbyte/cli.py +++ b/airbyte/cli.py @@ -14,7 +14,8 @@ pyab --help ``` -You can also use the fast and powerful `uv` tool to run the PyAirbyte CLI without pre-installing: +You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI +without pre-installing: ``` # Install `uv` if you haven't already: @@ -23,6 +24,28 @@ # Run the PyAirbyte CLI using `uvx`: uvx --from=airbyte pyab --help ``` + +Example `benchmark` Usage: + + ``` + # PyAirbyte System Benchmark (no-op): + pyab benchmark --num-records=2.4e6 + + # PyAirbyte Source Benchmark (hardcoded records): + pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' + pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*' + pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams=dummy_fields + + # PyAirbyte Source Benchmark (from docker image): + pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}' + ``` + +Example `validate` Usage: + + ``` + pyab validate --connector=source-hardcoded-records + pyab validate --connector=source-hardcoded-records --config='{count: 400_000}' + ``` """ from __future__ import annotations @@ -118,6 +141,22 @@ def _inject_secrets(config_dict: dict[str, Any]) -> None: return config_dict +def _is_docker_image(image: str | None) -> bool: + """Check if the source or destination is a docker image.""" + return image is not None and ":" in image + + +def _is_executable_path(connector_str: str) -> bool: + return connector_str.startswith(".") or "/" in connector_str + + +def _get_connector_name(connector: str) -> str: + if _is_docker_image(connector): + return connector.split(":")[0].split("/")[-1] + + return connector + + def _resolve_source_job( *, source: str | None = None, @@ -127,16 +166,33 @@ def _resolve_source_job( """Resolve the source job into a configured Source object. Args: - source: The source name, with an optional version declaration. + source: The source name or source reference. If a path is provided, the source will be loaded from the local path. - If the string `'.'` is provided, the source will be loaded from the current - working directory. + If the source contains a colon (':'), it will be interpreted as a docker image and tag. config: The path to a configuration file for the named source or destination. streams: A comma-separated list of stream names to select for reading. If set to "*", all streams will be selected. If not provided, all streams will be selected. """ + if not config: + raise PyAirbyteInputError( + message="No configuration found.", + ) + config_dict = _resolve_config(config) if config else {} + streams_list: str | list[str] = streams or "*" + if isinstance(streams, str) and streams != "*": + streams_list = [stream.strip() for stream in streams.split(",")] + source_obj: Source - if source and (source.startswith(".") or "/" in source): + if source and _is_docker_image(source): + source_obj = get_source( + name=_get_connector_name(source), + docker_image=source, + config=config_dict, + streams=streams_list, + ) + return source_obj + + if source and _is_executable_path(source): # Treat the source as a path. source_executable = Path(source) if not source_executable.exists(): @@ -149,26 +205,22 @@ def _resolve_source_job( source_obj = get_source( name=source_executable.stem, local_executable=source_executable, + config=config_dict, + streams=streams_list, ) return source_obj - if not config: - raise PyAirbyteInputError( - message="No configuration found.", - ) + if not source or not source.startswith("source-"): raise PyAirbyteInputError( - message="Expected a source name or path to executable.", + message="Expected a source name, docker image, or path to executable.", input_value=source, ) source_name: str = source - streams_list: str | list[str] = streams or "*" - if isinstance(streams, str) and streams != "*": - streams_list = [stream.strip() for stream in streams.split(",")] return get_source( name=source_name, - config=_resolve_config(config) if config else {}, + config=config_dict, streams=streams_list, ) @@ -181,10 +233,10 @@ def _resolve_destination_job( """Resolve the destination job into a configured Destination object. Args: - destination: The destination name, with an optional version declaration. - If a path is provided, the destination will be loaded from the local path. - If the string `'.'` is provided, the destination will be loaded from the current - working directory. + destination: The destination name or source reference. + If a path is provided, the source will be loaded from the local path. + If the destination contains a colon (':'), it will be interpreted as a docker image + and tag. config: The path to a configuration file for the named source or destination. """ if not config: @@ -236,62 +288,27 @@ def _resolve_destination_job( required=False, help=CONFIG_HELP, ) -@click.option( - "--install", - is_flag=True, - default=False, - help=( - "Whether to install the connector if it is not available locally. " - "Defaults to False, meaning the connector is expected to be already be installed." - ), -) def validate( connector: str | None = None, config: str | None = None, - *, - install: bool = False, ) -> None: """Validate the connector.""" - local_executable: Path | None = None if not connector: raise PyAirbyteInputError( message="No connector provided.", ) - if connector.startswith(".") or "/" in connector: - # Treat the connector as a path. - local_executable = Path(connector) - if not local_executable.exists(): - raise PyAirbyteInputError( - message="Connector executable not found.", - context={ - "connector": connector, - }, - ) - connector_name = local_executable.stem - else: - connector_name = connector - - if not connector_name.startswith("source-") and not connector_name.startswith("destination-"): - raise PyAirbyteInputError( - message=( - "Expected a connector name or path to executable. " - "Connector names are expected to begin with 'source-' or 'destination-'." - ), - input_value=connector, - ) connector_obj: Source | Destination - if connector_name.startswith("source-"): - connector_obj = get_source( - name=connector_name, - local_executable=local_executable, - install_if_missing=install, + if "source-" in connector: + connector_obj = _resolve_source_job( + source=connector, + config=None, + streams=None, ) else: # destination - connector_obj = get_destination( - name=connector_name, - local_executable=local_executable, - install_if_missing=install, + connector_obj = _resolve_destination_job( + destination=connector, + config=None, ) print("Getting `spec` output from connector...") @@ -310,7 +327,7 @@ def validate( type=str, help=( "The source name, with an optional version declaration. " - "If a path is provided, it will be interpreted as a path to the local executable. " + "If the name contains a colon (':'), it will be interpreted as a docker image and tag. " ), ) @click.option(