Skip to content

Commit

Permalink
Feat: Add sync CLI (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Oct 22, 2024
1 parent b355ea9 commit c644066
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions airbyte/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@

from __future__ import annotations

import json
from pathlib import Path
from typing import TYPE_CHECKING, Any

Expand Down Expand Up @@ -124,6 +123,12 @@
"""For example, --config='{password: "SECRET:MY_PASSWORD"}'."""
)

PIP_URL_HELP = (
"This can be anything pip accepts, including: a PyPI package name, a local path, "
"a git repository, a git branch ref, etc. Use '.' to install from the current local "
"directory."
)


def _resolve_config(
config: str,
Expand All @@ -150,7 +155,7 @@ def _inject_secrets(config_dict: dict[str, Any]) -> None:
message="Config file not found.",
input_value=str(config_path),
)
config_dict = json.loads(config_path.read_text(encoding="utf-8"))
config_dict = yaml.safe_load(config_path.read_text(encoding="utf-8"))

_inject_secrets(config_dict)
return config_dict
Expand Down Expand Up @@ -447,6 +452,93 @@ def benchmark(
)


@click.command()
@click.option(
"--source",
type=str,
help=(
"The source name, with an optional version declaration. "
"If the name contains a colon (':'), it will be interpreted as a docker image and tag. "
),
)
@click.option(
"--destination",
type=str,
help=(
"The destination name, with an optional version declaration. "
"If a path is provided, it will be interpreted as a path to the local executable. "
),
)
@click.option(
"--streams",
type=str,
help=(
"A comma-separated list of stream names to select for reading. If set to '*', all streams "
"will be selected. Defaults to '*'."
),
)
@click.option(
"--Sconfig",
"source_config",
type=str,
help="The source config. " + CONFIG_HELP,
)
@click.option(
"--Dconfig",
"destination_config",
type=str,
help="The destination config. " + CONFIG_HELP,
)
@click.option(
"--Spip-url",
"source_pip_url",
type=str,
help="Optional pip URL for the source (Python connectors only). " + PIP_URL_HELP,
)
@click.option(
"--Dpip-url",
"destination_pip_url",
type=str,
help="Optional pip URL for the destination (Python connectors only). " + PIP_URL_HELP,
)
def sync(
*,
source: str,
source_config: str | None = None,
source_pip_url: str | None = None,
destination: str,
destination_config: str | None = None,
destination_pip_url: str | None = None,
streams: str | None = None,
) -> None:
"""Run a sync operation.
Currently, this only supports full refresh syncs. Incremental syncs are not yet supported.
Custom catalog syncs are not yet supported.
"""
destination_obj: Destination
source_obj: Source

source_obj = _resolve_source_job(
source=source,
config=source_config,
streams=streams,
pip_url=source_pip_url,
)
destination_obj = _resolve_destination_job(
destination=destination,
config=destination_config,
pip_url=destination_pip_url,
)

click.echo("Running sync...")
destination_obj.write(
source_data=source_obj,
cache=False,
state_cache=False,
)


@click.group()
def cli() -> None:
"""PyAirbyte CLI."""
Expand All @@ -455,6 +547,7 @@ def cli() -> None:

cli.add_command(validate)
cli.add_command(benchmark)
cli.add_command(sync)

if __name__ == "__main__":
cli()

0 comments on commit c644066

Please sign in to comment.