Skip to content

Commit

Permalink
clean up cli
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Oct 9, 2024
1 parent b3c04ac commit a2f6d49
Showing 1 changed file with 79 additions and 62 deletions.
141 changes: 79 additions & 62 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
pyab --help
```
You can also use the fast and powerful `uv` tool to run the PyAirbyte CLI without pre-installing:
You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI
without pre-installing:
```
# Install `uv` if you haven't already:
Expand All @@ -23,6 +24,28 @@
# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help
```
Example `benchmark` Usage:
```
# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6
# PyAirbyte Source Benchmark (hardcoded records):
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams=dummy_fields
# PyAirbyte Source Benchmark (from docker image):
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
```
Example `validate` Usage:
```
pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
```
"""

from __future__ import annotations
Expand Down Expand Up @@ -118,6 +141,22 @@ def _inject_secrets(config_dict: dict[str, Any]) -> None:
return config_dict


def _is_docker_image(image: str | None) -> bool:
"""Check if the source or destination is a docker image."""
return image is not None and ":" in image


def _is_executable_path(connector_str: str) -> bool:
return connector_str.startswith(".") or "/" in connector_str


def _get_connector_name(connector: str) -> str:
if _is_docker_image(connector):
return connector.split(":")[0].split("/")[-1]

return connector


def _resolve_source_job(
*,
source: str | None = None,
Expand All @@ -127,16 +166,33 @@ def _resolve_source_job(
"""Resolve the source job into a configured Source object.
Args:
source: The source name, with an optional version declaration.
source: The source name or source reference.
If a path is provided, the source will be loaded from the local path.
If the string `'.'` is provided, the source will be loaded from the current
working directory.
If the source contains a colon (':'), it will be interpreted as a docker image and tag.
config: The path to a configuration file for the named source or destination.
streams: A comma-separated list of stream names to select for reading. If set to "*",
all streams will be selected. If not provided, all streams will be selected.
"""
if not config:
raise PyAirbyteInputError(
message="No configuration found.",
)
config_dict = _resolve_config(config) if config else {}
streams_list: str | list[str] = streams or "*"
if isinstance(streams, str) and streams != "*":
streams_list = [stream.strip() for stream in streams.split(",")]

source_obj: Source
if source and (source.startswith(".") or "/" in source):
if source and _is_docker_image(source):
source_obj = get_source(
name=_get_connector_name(source),
docker_image=source,
config=config_dict,
streams=streams_list,
)
return source_obj

if source and _is_executable_path(source):
# Treat the source as a path.
source_executable = Path(source)
if not source_executable.exists():
Expand All @@ -149,26 +205,22 @@ def _resolve_source_job(
source_obj = get_source(
name=source_executable.stem,
local_executable=source_executable,
config=config_dict,
streams=streams_list,
)
return source_obj
if not config:
raise PyAirbyteInputError(
message="No configuration found.",
)

if not source or not source.startswith("source-"):
raise PyAirbyteInputError(
message="Expected a source name or path to executable.",
message="Expected a source name, docker image, or path to executable.",
input_value=source,
)

source_name: str = source
streams_list: str | list[str] = streams or "*"
if isinstance(streams, str) and streams != "*":
streams_list = [stream.strip() for stream in streams.split(",")]

return get_source(
name=source_name,
config=_resolve_config(config) if config else {},
config=config_dict,
streams=streams_list,
)

Expand All @@ -181,10 +233,10 @@ def _resolve_destination_job(
"""Resolve the destination job into a configured Destination object.
Args:
destination: The destination name, with an optional version declaration.
If a path is provided, the destination will be loaded from the local path.
If the string `'.'` is provided, the destination will be loaded from the current
working directory.
destination: The destination name or source reference.
If a path is provided, the source will be loaded from the local path.
If the destination contains a colon (':'), it will be interpreted as a docker image
and tag.
config: The path to a configuration file for the named source or destination.
"""
if not config:
Expand Down Expand Up @@ -236,62 +288,27 @@ def _resolve_destination_job(
required=False,
help=CONFIG_HELP,
)
@click.option(
"--install",
is_flag=True,
default=False,
help=(
"Whether to install the connector if it is not available locally. "
"Defaults to False, meaning the connector is expected to be already be installed."
),
)
def validate(
connector: str | None = None,
config: str | None = None,
*,
install: bool = False,
) -> None:
"""Validate the connector."""
local_executable: Path | None = None
if not connector:
raise PyAirbyteInputError(
message="No connector provided.",
)
if connector.startswith(".") or "/" in connector:
# Treat the connector as a path.
local_executable = Path(connector)
if not local_executable.exists():
raise PyAirbyteInputError(
message="Connector executable not found.",
context={
"connector": connector,
},
)
connector_name = local_executable.stem
else:
connector_name = connector

if not connector_name.startswith("source-") and not connector_name.startswith("destination-"):
raise PyAirbyteInputError(
message=(
"Expected a connector name or path to executable. "
"Connector names are expected to begin with 'source-' or 'destination-'."
),
input_value=connector,
)

connector_obj: Source | Destination
if connector_name.startswith("source-"):
connector_obj = get_source(
name=connector_name,
local_executable=local_executable,
install_if_missing=install,
if "source-" in connector:
connector_obj = _resolve_source_job(
source=connector,
config=None,
streams=None,
)
else: # destination
connector_obj = get_destination(
name=connector_name,
local_executable=local_executable,
install_if_missing=install,
connector_obj = _resolve_destination_job(
destination=connector,
config=None,
)

print("Getting `spec` output from connector...")
Expand All @@ -310,7 +327,7 @@ def validate(
type=str,
help=(
"The source name, with an optional version declaration. "
"If a path is provided, it will be interpreted as a path to the local executable. "
"If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
),
)
@click.option(
Expand Down

0 comments on commit a2f6d49

Please sign in to comment.