Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Use GCS public bucket to get manifest yaml for connectors; adds support for pinning versions and getting prior versions #394

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 38 additions & 44 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,56 +25,50 @@
from airbyte._executors.base import Executor


def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict:
VERSION_LATEST = "latest"
DEFAULT_MANIFEST_URL = (
"https://storage.googleapis.com/prod-airbyte-cloud-connector-metadata-service/"
"metadata/airbyte/{source_name}/{version}/manifest.yaml"
)
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved


def _try_get_source_manifest(
source_name: str,
manifest_url: str | None,
version: str | None = None,
) -> dict:
"""Try to get a source manifest from a URL.

If the URL is not provided, we'll try a couple of default URLs.
We can remove/refactor this once manifests are available in GCS connector registry.
If the URL is not provided, we'll try the default URL in the public GCS bucket.

Raises:
- `PyAirbyteInputError`: If `source_name` is `None`.
- `HTTPError`: If fetching the URL was unsuccessful.
"""
if manifest_url:
response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex
if source_name is None:
raise exc.PyAirbyteInputError(
message="Param 'source_name' is required.",
)

# No manifest URL was provided. We'll try a couple of default URLs.
# If manifest URL was provided, we'll use the default URL from the public GCS bucket.

manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format(
source_name=source_name,
version=(version or VERSION_LATEST).removeprefix("v"),
)

response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
# First try the new URL format (language='manifest-only'):
result_1 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/manifest.yaml"
),
)
except HTTPError as ex_1:
# If the new URL path was not found, try the old URL format (language='low-code'):
try:
result_2 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml"
),
)
except HTTPError:
# Raise the first exception, since that represents the new default URL
raise ex_1 from None
else:
# Old URL path was found (no exceptions raised).
return result_2
else:
# New URL path was found (no exceptions raised).
return result_1
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved


def _get_local_executor(
Expand Down
Loading