Skip to content

Commit

Permalink
Feat(CLI): Add support for docker images and local python dev executa…
Browse files Browse the repository at this point in the history
…bles in poetry (#415)
  • Loading branch information
aaronsteers authored Oct 9, 2024
1 parent 53246a3 commit 919a812
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 83 deletions.
152 changes: 90 additions & 62 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
pyab --help
```
You can also use the fast and powerful `uv` tool to run the CLI without pre-installing:
You can also use `pipx` or the fast and powerful `uv` tool to run the PyAirbyte CLI
without pre-installing:
```
# Install `uv` if you haven't already:
Expand All @@ -23,6 +24,43 @@
# Run the PyAirbyte CLI using `uvx`:
uvx --from=airbyte pyab --help
```
Example `benchmark` Usage:
```
# PyAirbyte System Benchmark (no-op):
pyab benchmark --num-records=2.4e6
# Source Benchmark:
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}'
pyab benchmark --source=source-hardcoded-records --config='{count: 400000}' --streams='*'
pyab benchmark --source=source-hardcoded-records --config='{count: 4000}' --streams=dummy_fields
# Source Benchmark from Docker Image:
pyab benchmark --source=airbyte/source-hardcoded-records:latest --config='{count: 400_000}'
pyab benchmark --source=airbyte/source-hardcoded-records:dev --config='{count: 400_000}'
# Destination Benchmark:
pyab benchmark --destination=destination-dev-null --config=/path/to/config.json
# Benchmark a Local Python Source (source-s3):
pyab benchmark --source=$(poetry run which source-s3) --config=./secrets/config.json
# Equivalent to:
LOCAL_EXECUTABLE=$(poetry run which source-s3)
CONFIG_PATH=$(realpath ./secrets/config.json)
pyab benchmark --source=$LOCAL_EXECUTABLE --config=$CONFIG_PATH
```
Example Usage with `uv`:
Example `validate` Usage:
```
pyab validate --connector=source-hardcoded-records
pyab validate --connector=source-hardcoded-records --config='{count: 400_000}'
```
"""

from __future__ import annotations
Expand Down Expand Up @@ -118,6 +156,22 @@ def _inject_secrets(config_dict: dict[str, Any]) -> None:
return config_dict


def _is_docker_image(image: str | None) -> bool:
"""Check if the source or destination is a docker image."""
return image is not None and ":" in image


def _is_executable_path(connector_str: str) -> bool:
return connector_str.startswith(".") or "/" in connector_str


def _get_connector_name(connector: str) -> str:
if _is_docker_image(connector):
return connector.split(":")[0].split("/")[-1]

return connector


def _resolve_source_job(
*,
source: str | None = None,
Expand All @@ -127,16 +181,29 @@ def _resolve_source_job(
"""Resolve the source job into a configured Source object.
Args:
source: The source name, with an optional version declaration.
source: The source name or source reference.
If a path is provided, the source will be loaded from the local path.
If the string `'.'` is provided, the source will be loaded from the current
working directory.
If the source contains a colon (':'), it will be interpreted as a docker image and tag.
config: The path to a configuration file for the named source or destination.
streams: A comma-separated list of stream names to select for reading. If set to "*",
all streams will be selected. If not provided, all streams will be selected.
"""
config_dict = _resolve_config(config) if config else None
streams_list: str | list[str] = streams or "*"
if isinstance(streams, str) and streams != "*":
streams_list = [stream.strip() for stream in streams.split(",")]

source_obj: Source
if source and (source.startswith(".") or "/" in source):
if source and _is_docker_image(source):
source_obj = get_source(
name=_get_connector_name(source),
docker_image=source,
config=config_dict,
streams=streams_list,
)
return source_obj

if source and _is_executable_path(source):
# Treat the source as a path.
source_executable = Path(source)
if not source_executable.exists():
Expand All @@ -149,26 +216,22 @@ def _resolve_source_job(
source_obj = get_source(
name=source_executable.stem,
local_executable=source_executable,
config=config_dict,
streams=streams_list,
)
return source_obj
if not config:
raise PyAirbyteInputError(
message="No configuration found.",
)

if not source or not source.startswith("source-"):
raise PyAirbyteInputError(
message="Expected a source name or path to executable.",
message="Expected a source name, docker image, or path to executable.",
input_value=source,
)

source_name: str = source
streams_list: str | list[str] = streams or "*"
if isinstance(streams, str) and streams != "*":
streams_list = [stream.strip() for stream in streams.split(",")]

return get_source(
name=source_name,
config=_resolve_config(config) if config else {},
config=config_dict,
streams=streams_list,
)

Expand All @@ -181,10 +244,10 @@ def _resolve_destination_job(
"""Resolve the destination job into a configured Destination object.
Args:
destination: The destination name, with an optional version declaration.
If a path is provided, the destination will be loaded from the local path.
If the string `'.'` is provided, the destination will be loaded from the current
working directory.
destination: The destination name or source reference.
If a path is provided, the source will be loaded from the local path.
If the destination contains a colon (':'), it will be interpreted as a docker image
and tag.
config: The path to a configuration file for the named source or destination.
"""
if not config:
Expand Down Expand Up @@ -236,62 +299,27 @@ def _resolve_destination_job(
required=False,
help=CONFIG_HELP,
)
@click.option(
"--install",
is_flag=True,
default=False,
help=(
"Whether to install the connector if it is not available locally. "
"Defaults to False, meaning the connector is expected to be already be installed."
),
)
def validate(
connector: str | None = None,
config: str | None = None,
*,
install: bool = False,
) -> None:
"""Validate the connector."""
local_executable: Path | None = None
if not connector:
raise PyAirbyteInputError(
message="No connector provided.",
)
if connector.startswith(".") or "/" in connector:
# Treat the connector as a path.
local_executable = Path(connector)
if not local_executable.exists():
raise PyAirbyteInputError(
message="Connector executable not found.",
context={
"connector": connector,
},
)
connector_name = local_executable.stem
else:
connector_name = connector

if not connector_name.startswith("source-") and not connector_name.startswith("destination-"):
raise PyAirbyteInputError(
message=(
"Expected a connector name or path to executable. "
"Connector names are expected to begin with 'source-' or 'destination-'."
),
input_value=connector,
)

connector_obj: Source | Destination
if connector_name.startswith("source-"):
connector_obj = get_source(
name=connector_name,
local_executable=local_executable,
install_if_missing=install,
if "source-" in connector:
connector_obj = _resolve_source_job(
source=connector,
config=None,
streams=None,
)
else: # destination
connector_obj = get_destination(
name=connector_name,
local_executable=local_executable,
install_if_missing=install,
connector_obj = _resolve_destination_job(
destination=connector,
config=None,
)

print("Getting `spec` output from connector...")
Expand All @@ -310,7 +338,7 @@ def validate(
type=str,
help=(
"The source name, with an optional version declaration. "
"If a path is provided, it will be interpreted as a path to the local executable. "
"If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
),
)
@click.option(
Expand Down
67 changes: 46 additions & 21 deletions examples/run_perf_test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,25 @@
```
# Run with 5_000 records
poetry run python ./examples/run_perf_test_reads.py -n=1e3
poetry run python ./examples/run_perf_test_reads.py -n=5e3
# Run with 500_000 records
poetry run python ./examples/run_perf_test_reads.py -n=1e5
poetry run python ./examples/run_perf_test_reads.py -n=5e5
# Load 1 million records to Snowflake cache
poetry run python ./examples/run_perf_test_reads.py -n=1e6 --cache=snowflake
# Load 5_000 records to Snowflake
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake
# Load 1 million records to Snowflake destination
poetry run python ./examples/run_perf_test_reads.py -n=1e6 --destination=snowflake
# Load 5_000 records to BigQuery
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery
poetry run python ./examples/run_perf_test_reads.py -n=5e3 --cache=bigquery
```
You can also use this script to test destination load performance:
```bash
# Load 5_000 records to BigQuery
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=e2e
poetry run python ./examples/run_perf_test_reads.py -n=5e3 --destination=e2e
```
Testing raw PyAirbyte throughput with and without caching:
Expand Down Expand Up @@ -74,6 +77,7 @@
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources import get_benchmark_source
from typing_extensions import Literal
from ulid import ULID

if TYPE_CHECKING:
from airbyte.sources.base import Source
Expand All @@ -82,6 +86,12 @@
AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"


def _random_suffix() -> str:
"""Generate a random suffix for use in test environments, using ULIDs."""
ulid = str(ULID())
return ulid[:6] + ulid[-3:]


def get_gsm_secret_json(secret_name: str) -> dict:
secret_mgr = GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
Expand All @@ -95,25 +105,26 @@ def get_gsm_secret_json(secret_name: str) -> dict:


def get_cache(
cache_type: Literal["duckdb", "snowflake", "bigquery", False],
cache_type: Literal["duckdb", "snowflake", "bigquery", "disabled", False],
) -> CacheBase | Literal[False]:
if cache_type is False:
if cache_type is False or cache_type == "disabled":
return False

if cache_type == "duckdb":
return ab.new_local_cache()

if cache_type == "snowflake":
secret_config = get_gsm_secret_json(
snowflake_config = get_gsm_secret_json(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
return SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
account=snowflake_config["account"],
username=snowflake_config["username"],
password=snowflake_config["password"],
database=snowflake_config["database"],
warehouse=snowflake_config["warehouse"],
role=snowflake_config["role"],
schema_name=f"INTEGTEST_{_random_suffix()}",
)

if cache_type == "bigquery":
Expand Down Expand Up @@ -171,12 +182,26 @@ def get_destination(destination_type: str) -> ab.Destination:
if destination_type in ["e2e", "noop"]:
return get_noop_destination()

if destination_type.removeprefix("destination-") == "snowflake":
snowflake_config = get_gsm_secret_json(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
snowflake_config["host"] = (
f"{snowflake_config['account']}.snowflakecomputing.com"
)
snowflake_config["schema"] = f"INTEGTEST_{_random_suffix()}"
return ab.get_destination(
"destination-snowflake",
config=snowflake_config,
docker_image=True,
)

raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003


def main(
n: int | str = "5e5",
cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb",
cache_type: Literal["duckdb", "bigquery", "snowflake", "disabled"] = "disabled",
source_alias: str = "e2e",
destination_type: str | None = None,
) -> None:
Expand Down Expand Up @@ -222,8 +247,8 @@ def main(
"--cache",
type=str,
help="The cache type to use.",
choices=["duckdb", "snowflake", "bigquery"],
default="duckdb",
choices=["duckdb", "snowflake", "bigquery", "disabled"],
default="disabled",
)
parser.add_argument(
"--no-cache",
Expand All @@ -244,20 +269,20 @@ def main(
"hardcoded",
"faker",
],
default="hardcoded",
default="benchmark",
)
parser.add_argument(
"--destination",
type=str,
help=("The destination to use (optional)."),
choices=["e2e"],
choices=["e2e", "noop", "snowflake"],
default=None,
)
args = parser.parse_args()

main(
n=args.n,
cache_type=args.cache if not args.no_cache else False,
cache_type=args.cache if not args.no_cache else "disabled",
source_alias=args.source,
destination_type=args.destination,
)

0 comments on commit 919a812

Please sign in to comment.