Skip to content

Commit

Permalink
use GCS public bucket to get manifest yaml; add support for pinning v…
Browse files Browse the repository at this point in the history
…ersions and getting prior versions
  • Loading branch information
aaronsteers committed Sep 23, 2024
1 parent 2fae5a3 commit 3333ddb
Showing 1 changed file with 38 additions and 44 deletions.
82 changes: 38 additions & 44 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,50 @@
from airbyte._executors.base import Executor


def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict:
VERSION_LATEST = "latest"
DEFAULT_MANIFEST_URL = (
"https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/"
"metadata/airbyte/{source_name}/{version}/manifest.yaml"
)


def _try_get_source_manifest(
source_name: str,
manifest_url: str | None,
version: str | None = None,
) -> dict:
"""Try to get a source manifest from a URL.
If the URL is not provided, we'll try a couple of default URLs.
We can remove/refactor this once manifests are available in GCS connector registry.
If the URL is not provided, we'll try the default URL in the public GCS bucket.
Raises:
- `PyAirbyteInputError`: If `source_name` is `None`.
- `HTTPError`: If fetching the URL was unsuccessful.
"""
if manifest_url:
response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex
if source_name is None:
raise exc.PyAirbyteInputError(
message="Param 'source_name' is required.",
)

# No manifest URL was provided. We'll try a couple of default URLs.
# If manifest URL was provided, we'll use the default URL from the public GCS bucket.

manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format(
source_name=source_name,
version=(version or VERSION_LATEST).removeprefix("v"),
)

response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
# First try the new URL format (language='manifest-only'):
result_1 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/manifest.yaml"
),
)
except HTTPError as ex_1:
# If the new URL path was not found, try the old URL format (language='low-code'):
try:
result_2 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml"
),
)
except HTTPError:
# Raise the first exception, since that represents the new default URL
raise ex_1 from None
else:
# Old URL path was found (no exceptions raised).
return result_2
else:
# New URL path was found (no exceptions raised).
return result_1
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex


def _get_local_executor(
Expand Down

0 comments on commit 3333ddb

Please sign in to comment.