Skip to content

Commit

Permalink
resolve todo items
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Oct 6, 2024
1 parent eb9b45a commit a120510
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 18 deletions.
186 changes: 173 additions & 13 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@

from __future__ import annotations

from typing import TYPE_CHECKING
import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast

import click
import dpath
import yaml

from airbyte.destinations.util import get_destination, get_noop_destination
from airbyte.exceptions import PyAirbyteInputError
Expand All @@ -20,22 +24,131 @@
def _resolve_source_job(
*,
source: str | None = None,
source_job: str | None = None,
config: Path | None = None,
job_file: str | None = None,
job_dpath: str | None = None,
) -> Source:
"""Resolve the source job into a configured Source object."""
# TODO: Implement this function.
raise NotImplementedError("Not implemented.")
_ = get_source(...)
"""Resolve the source job into a configured Source object.
Args:
source: The source name, with an optional version declaration.
If a path is provided, the source will be loaded from the local path.
If the string `'.'` is provided, the source will be loaded from the current
working directory.
config: The path to a configuration file for the named source or destination.
If `config` is provided, the `job_file` and `job_dpath` options will be ignored.
job_file: A yaml file containing the job definition.
job_dpath: The dpath expression pointing to a job definition within the job file.
"""
source_obj: Source
if source and (source.startswith(".") or "/" in source):
# Treat the source as a path.
source_executable = Path(source)
if not source_executable.exists():
raise PyAirbyteInputError(
message=f"Source executable not found: {source}",
)
source_obj = get_source(
name=source_executable.stem,
local_executable=source_executable,
)
return source_obj

if not source or not source.startswith("source-"):
raise PyAirbyteInputError(
message="Expected a source name or path to executable.",
input_value=source,
)
source_name: str = source

config_dict: dict[str, Any] = {}
if config:
config_dict = yaml.safe_load(config.read_text(encoding="utf-8"))

elif job_file and job_file.endswith(".json"):
# Treat the job file as a config file.
config_dict = json.loads(
Path(job_file).read_text(encoding="utf-8"),
)

elif job_file and job_file.endswith(".yaml"):
# Load the source from the job file.
job_file_data = yaml.safe_load(job_file)
if job_dpath:
job_data = dpath.get(
obj=job_file_data,
glob=job_dpath,
)
if not isinstance(job_data, dict):
raise PyAirbyteInputError(
message="Invalid job definition.",
input_value=str(job_data),
)
config_path = Path(job_data["config_path"])
if not config_path.exists():
raise PyAirbyteInputError(
message="Config file not found.",
input_value=str(config_path),
)
config_dict = yaml.safe_load(
config_path.read_text(encoding="utf-8"),
)

if not config_dict:
raise PyAirbyteInputError(
message="No configuration found.",
)

source_obj = get_source(
name=source_name,
config=config_dict,
streams="*",
)
return source_obj


def _resolve_destination_job(
*,
destination: str,
config: Path | None = None,
) -> Destination:
"""Resolve the destination job into a configured Destination object."""
# TODO: Implement this function.
raise NotImplementedError("Not implemented.")
_ = get_destination(...)
"""Resolve the destination job into a configured Destination object.
Args:
destination: The destination name, with an optional version declaration.
If a path is provided, the destination will be loaded from the local path.
If the string `'.'` is provided, the destination will be loaded from the current
working directory.
config: The path to a configuration file for the named source or destination.
"""
if not config:
raise PyAirbyteInputError(
message="No configuration found.",
)
config_dict = cast(
dict,
json.loads(config.read_text(encoding="utf-8")),
)

if destination and (destination.startswith(".") or "/" in destination):
# Treat the destination as a path.
destination_executable = Path(destination)
if not destination_executable.exists():
raise PyAirbyteInputError(
message=f"Destination executable not found: {destination}",
)
return get_destination(
name=destination_executable.stem,
local_executable=destination_executable,
config=config_dict,
)

# else: # Treat the destination as a name.

return get_destination(
name=destination,
config=config_dict,
)


@click.command()
Expand All @@ -45,10 +158,48 @@ def validate() -> None:


@click.command()
@click.option(
"--source",
type=str,
help="The source name, with an optional version declaration. If a path is provided, the source will be loaded from the local path. If the string `'.'` is provided, the source will be loaded from the current working directory.",
)
@click.option(
"--num-records",
type=str,
default="5e5",
help="The number of records to generate for the benchmark. Ignored if a source is provided. You can specify the number of records to generate using scientific notation. For example, `5e6` will generate 5 million records. By default, 500,000 records will be generated (`5e5` records). If underscores are providing within a numeric a string, they will be ignored.",
)
@click.option(
"--destination",
type=str,
help="The destination name, with an optional version declaration. If a path is provided, the destination will be loaded from the local path. If the string `'.'` is provided, the destination will be loaded from the current working directory. Destination can be omitted - in which case the source will be run in isolation.",
)
@click.option(
"--config",
type=Path,
help=(
"The path to a configuration file for the named source or destination."
"If `--config` is provided, the `--job-file` and `--job-dpath` options "
"will be ignored."
),
)
@click.option(
"--job-file",
type=str,
help="A yaml file containing the job definition.",
)
@click.option(
"--job-dpath",
type=str,
help="The dpath expression pointing to a job definition within the job file.",
)
def benchmark(
source: str | None = None,
num_records: int | str = "5e5", # 500,000 records
destination: str | None = None,
source_job: str | None = None,
config: Path | None = None,
job_file: str | None = None,
job_dpath: str | None = None,
) -> None:
"""Run benchmarks.
Expand All @@ -57,6 +208,11 @@ def benchmark(
If a path is provided, the source will be loaded from the local path.
If the string `'.'` is provided, the source will be loaded from the current
working directory.
num_records: The number of records to generate for the benchmark. Ignored if a source
is provided. You can specify the number of records to generate using scientific
notation. For example, `"5e6"` will generate 5 million records. By default, 500,000
records will be generated ("5e5" records).
If underscores are providing within a numeric a string, they will be ignored.
destination: The destination name, with an optional version declaration.
If a path is provided, the destination will be loaded from the local path.
If the string `'.'` is provided, the destination will be loaded from the current
Expand All @@ -74,10 +230,14 @@ def benchmark(
source_obj = (
_resolve_source_job(
source=source,
source_job=source_job,
config=config,
job_file=job_file,
job_dpath=job_dpath,
)
if source
else get_benchmark_source()
else get_benchmark_source(
num_records=num_records,
)
)
destination_obj = (
_resolve_destination_job(
Expand Down
11 changes: 6 additions & 5 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from __future__ import annotations

import warnings
from decimal import Decimal
from decimal import Decimal, InvalidOperation
from typing import TYPE_CHECKING, Any

from airbyte._executors.util import get_connector_executor
Expand Down Expand Up @@ -119,7 +119,7 @@ def get_source( # noqa: PLR0913 # Too many arguments


def get_benchmark_source(
num_records: int | str = 1000,
num_records: int | str = "5e5",
) -> Source:
"""Get a source for benchmarking.
Expand All @@ -130,9 +130,10 @@ def get_benchmark_source(
within a numeric a string, they will be ignored.
Args:
num_records (int | str): The number of records to generate.
Can be an integer (`1000`) or a string in scientific notation.
For example, `"5e6"` will generate 5 million records.
num_records (int | str): The number of records to generate. Defaults to "5e5", or
500,000 records.
Can be an integer (`1000`) or a string in scientific notation.
For example, `"5e6"` will generate 5 million records.
Returns:
Source: The source object for benchmarking.
Expand Down

0 comments on commit a120510

Please sign in to comment.