Skip to content

Commit

Permalink
update examples script to support snowflake destination
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Oct 9, 2024
1 parent 53246a3 commit fb09619
Showing 1 changed file with 41 additions and 16 deletions.
57 changes: 41 additions & 16 deletions examples/run_perf_test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
# Run with 500_000 records
poetry run python ./examples/run_perf_test_reads.py -n=1e5
# Load 5_000 records to Snowflake
# Load 1 million records to Snowflake cache
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=snowflake
# Load 1 million records to Snowflake destination
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --destination=snowflake
# Load 5_000 records to BigQuery
poetry run python ./examples/run_perf_test_reads.py -n=1e3 --cache=bigquery
```
Expand Down Expand Up @@ -74,6 +77,7 @@
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources import get_benchmark_source
from typing_extensions import Literal
from ulid import ULID

if TYPE_CHECKING:
from airbyte.sources.base import Source
Expand All @@ -82,6 +86,12 @@
AIRBYTE_INTERNAL_GCP_PROJECT = "dataline-integration-testing"


def _random_suffix() -> str:
"""Generate a random suffix for use in test environments, using ULIDs."""
ulid = str(ULID())
return ulid[:6] + ulid[-3:]


def get_gsm_secret_json(secret_name: str) -> dict:
secret_mgr = GoogleGSMSecretManager(
project=AIRBYTE_INTERNAL_GCP_PROJECT,
Expand All @@ -95,25 +105,26 @@ def get_gsm_secret_json(secret_name: str) -> dict:


def get_cache(
cache_type: Literal["duckdb", "snowflake", "bigquery", False],
cache_type: Literal["duckdb", "snowflake", "bigquery", "disabled", False],
) -> CacheBase | Literal[False]:
if cache_type is False:
if cache_type is False or cache_type == "disabled":
return False

if cache_type == "duckdb":
return ab.new_local_cache()

if cache_type == "snowflake":
secret_config = get_gsm_secret_json(
snowflake_config = get_gsm_secret_json(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
return SnowflakeCache(
account=secret_config["account"],
username=secret_config["username"],
password=secret_config["password"],
database=secret_config["database"],
warehouse=secret_config["warehouse"],
role=secret_config["role"],
account=snowflake_config["account"],
username=snowflake_config["username"],
password=snowflake_config["password"],
database=snowflake_config["database"],
warehouse=snowflake_config["warehouse"],
role=snowflake_config["role"],
schema_name=f"INTEGTEST_{_random_suffix()}",
)

if cache_type == "bigquery":
Expand Down Expand Up @@ -171,12 +182,26 @@ def get_destination(destination_type: str) -> ab.Destination:
if destination_type in ["e2e", "noop"]:
return get_noop_destination()

if destination_type.removeprefix("destination-") == "snowflake":
snowflake_config = get_gsm_secret_json(
secret_name="AIRBYTE_LIB_SNOWFLAKE_CREDS",
)
snowflake_config["host"] = (
f"{snowflake_config['account']}.snowflakecomputing.com"
)
snowflake_config["schema"] = f"INTEGTEST_{_random_suffix()}"
return ab.get_destination(
"destination-snowflake",
config=snowflake_config,
docker_image=True,
)

raise ValueError(f"Unknown destination type: {destination_type}") # noqa: TRY003


def main(
n: int | str = "5e5",
cache_type: Literal["duckdb", "bigquery", "snowflake", False] = "duckdb",
cache_type: Literal["duckdb", "bigquery", "snowflake", "disabled"] = "disabled",
source_alias: str = "e2e",
destination_type: str | None = None,
) -> None:
Expand Down Expand Up @@ -222,8 +247,8 @@ def main(
"--cache",
type=str,
help="The cache type to use.",
choices=["duckdb", "snowflake", "bigquery"],
default="duckdb",
choices=["duckdb", "snowflake", "bigquery", "disabled"],
default="disabled",
)
parser.add_argument(
"--no-cache",
Expand All @@ -244,20 +269,20 @@ def main(
"hardcoded",
"faker",
],
default="hardcoded",
default="benchmark",
)
parser.add_argument(
"--destination",
type=str,
help=("The destination to use (optional)."),
choices=["e2e"],
choices=["e2e", "noop", "snowflake"],
default=None,
)
args = parser.parse_args()

main(
n=args.n,
cache_type=args.cache if not args.no_cache else False,
cache_type=args.cache if not args.no_cache else "disabled",
source_alias=args.source,
destination_type=args.destination,
)

0 comments on commit fb09619

Please sign in to comment.