From 3333ddb66563a712740627d98664fe1eeaf4fa9f Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 23 Sep 2024 09:20:57 -0700 Subject: [PATCH 1/6] use GCS public bucket to get manifest yaml; add support for pinning versions and getting prior versions --- airbyte/_executors/util.py | 82 ++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 44 deletions(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 1ceecf93..1a00246f 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -25,56 +25,50 @@ from airbyte._executors.base import Executor -def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict: +VERSION_LATEST = "latest" +DEFAULT_MANIFEST_URL = ( + "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/" + "metadata/airbyte/{source_name}/{version}/manifest.yaml" +) + + +def _try_get_source_manifest( + source_name: str, + manifest_url: str | None, + version: str | None = None, +) -> dict: """Try to get a source manifest from a URL. - If the URL is not provided, we'll try a couple of default URLs. - We can remove/refactor this once manifests are available in GCS connector registry. + If the URL is not provided, we'll try the default URL in the public GCS bucket. + + Raises: + - `PyAirbyteInputError`: If `source_name` is `None`. + - `HTTPError`: If fetching the URL was unsuccessful. """ - if manifest_url: - response = requests.get(url=manifest_url) - response.raise_for_status() # Raise HTTPError exception if the download failed - try: - return cast(dict, yaml.safe_load(response.text)) - except yaml.YAMLError as ex: - raise exc.AirbyteConnectorInstallationError( - message="Failed to parse the connector manifest YAML.", - connector_name=source_name, - context={ - "manifest_url": manifest_url, - }, - ) from ex + if source_name is None: + raise exc.PyAirbyteInputError( + message="Param 'source_name' is required.", + ) - # No manifest URL was provided. We'll try a couple of default URLs. + # If manifest URL was provided, we'll use the default URL from the public GCS bucket. + manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format( + source_name=source_name, + version=(version or VERSION_LATEST).removeprefix("v"), + ) + + response = requests.get(url=manifest_url) + response.raise_for_status() # Raise HTTPError exception if the download failed try: - # First try the new URL format (language='manifest-only'): - result_1 = _try_get_source_manifest( - source_name=source_name, - manifest_url=( - f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations" - f"/connectors/{source_name}/manifest.yaml" - ), - ) - except HTTPError as ex_1: - # If the new URL path was not found, try the old URL format (language='low-code'): - try: - result_2 = _try_get_source_manifest( - source_name=source_name, - manifest_url=( - f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations" - f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml" - ), - ) - except HTTPError: - # Raise the first exception, since that represents the new default URL - raise ex_1 from None - else: - # Old URL path was found (no exceptions raised). - return result_2 - else: - # New URL path was found (no exceptions raised). - return result_1 + return cast(dict, yaml.safe_load(response.text)) + except yaml.YAMLError as ex: + raise exc.AirbyteConnectorInstallationError( + message="Failed to parse the connector manifest YAML.", + connector_name=source_name, + context={ + "manifest_url": manifest_url, + }, + ) from ex def _get_local_executor( From 105b49aa3300505cf093f88863eb1ae57aa01766 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Mon, 23 Sep 2024 16:25:13 +0000 Subject: [PATCH 2/6] Auto-fix lint and format issues --- airbyte/_executors/util.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 1a00246f..97d597e4 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -7,7 +7,6 @@ import requests import yaml -from requests import HTTPError from rich import print # noqa: A004 # Allow shadowing the built-in from airbyte import exceptions as exc From b978d48dbac2b6315bf32183c07404ca446b10fd Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 23 Sep 2024 09:37:32 -0700 Subject: [PATCH 3/6] use Airbyte-managed path --- airbyte/_executors/util.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 97d597e4..6cfc5d67 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -26,8 +26,7 @@ VERSION_LATEST = "latest" DEFAULT_MANIFEST_URL = ( - "https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/" - "metadata/airbyte/{source_name}/{version}/manifest.yaml" + "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" ) From f32b24e864f54e293a9bdfc80578df1106084da4 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 23 Sep 2024 09:43:52 -0700 Subject: [PATCH 4/6] declare addl exception --- airbyte/_executors/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 6cfc5d67..780eaa88 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -42,6 +42,7 @@ def _try_get_source_manifest( Raises: - `PyAirbyteInputError`: If `source_name` is `None`. - `HTTPError`: If fetching the URL was unsuccessful. + - `YAMLError`: If parsing the YAML failed. """ if source_name is None: raise exc.PyAirbyteInputError( From 593f5c270fa6d4ff98940e1eb57714882cb2fbb1 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 23 Sep 2024 09:44:38 -0700 Subject: [PATCH 5/6] improve version handling, update User-Agent string --- airbyte/_executors/util.py | 9 +++++++-- airbyte/sources/registry.py | 3 ++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 780eaa88..08be3d98 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -18,6 +18,7 @@ from airbyte._util.telemetry import EventState, log_install_state # Non-public API from airbyte.constants import TEMP_DIR_OVERRIDE from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata +from airbyte.version import get_version if TYPE_CHECKING: @@ -51,12 +52,16 @@ def _try_get_source_manifest( # If manifest URL was provided, we'll use the default URL from the public GCS bucket. + cleaned_version = (version or VERSION_LATEST).removeprefix("v") manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format( source_name=source_name, - version=(version or VERSION_LATEST).removeprefix("v"), + version=cleaned_version, ) - response = requests.get(url=manifest_url) + response = requests.get( + url=manifest_url, + headers={"User-Agent": f"PyAirbyte/{get_version()}"}, + ) response.raise_for_status() # Raise HTTPError exception if the download failed try: return cast(dict, yaml.safe_load(response.text)) diff --git a/airbyte/sources/registry.py b/airbyte/sources/registry.py index 87f10cda..c2308a0a 100644 --- a/airbyte/sources/registry.py +++ b/airbyte/sources/registry.py @@ -235,7 +235,8 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe registry_url = _get_registry_url() if registry_url.startswith("http"): response = requests.get( - registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"} + registry_url, + headers={"User-Agent": f"PyAirbyte/{get_version()}"}, ) response.raise_for_status() data = response.json() From 9e0d8859ed4adab34e2dd1b6d44c4df7163ac56c Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Mon, 23 Sep 2024 09:47:09 -0700 Subject: [PATCH 6/6] update comment text --- airbyte/_executors/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 08be3d98..8d0412b3 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -38,7 +38,7 @@ def _try_get_source_manifest( ) -> dict: """Try to get a source manifest from a URL. - If the URL is not provided, we'll try the default URL in the public GCS bucket. + If the URL is not provided, we'll try the default URL in the Airbyte registry. Raises: - `PyAirbyteInputError`: If `source_name` is `None`. @@ -50,7 +50,7 @@ def _try_get_source_manifest( message="Param 'source_name' is required.", ) - # If manifest URL was provided, we'll use the default URL from the public GCS bucket. + # If manifest URL was provided, we'll use the default URL from the Airbyte registry. cleaned_version = (version or VERSION_LATEST).removeprefix("v") manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format(