Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Use GCS public bucket to get manifest yaml for connectors; adds support for pinning versions and getting prior versions #394

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 43 additions & 45 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import requests
import yaml
from requests import HTTPError
from rich import print # noqa: A004 # Allow shadowing the built-in

from airbyte import exceptions as exc
Expand All @@ -19,62 +18,61 @@
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
from airbyte.constants import TEMP_DIR_OVERRIDE
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata
from airbyte.version import get_version


if TYPE_CHECKING:
from airbyte._executors.base import Executor


def _try_get_source_manifest(source_name: str, manifest_url: str | None) -> dict:
VERSION_LATEST = "latest"
DEFAULT_MANIFEST_URL = (
"https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml"
)


def _try_get_source_manifest(
source_name: str,
manifest_url: str | None,
version: str | None = None,
) -> dict:
"""Try to get a source manifest from a URL.

If the URL is not provided, we'll try a couple of default URLs.
We can remove/refactor this once manifests are available in GCS connector registry.
If the URL is not provided, we'll try the default URL in the public GCS bucket.

Raises:
- `PyAirbyteInputError`: If `source_name` is `None`.
- `HTTPError`: If fetching the URL was unsuccessful.
- `YAMLError`: If parsing the YAML failed.
"""
if manifest_url:
response = requests.get(url=manifest_url)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex
if source_name is None:
raise exc.PyAirbyteInputError(
message="Param 'source_name' is required.",
)

# If manifest URL was provided, we'll use the default URL from the public GCS bucket.

# No manifest URL was provided. We'll try a couple of default URLs.
cleaned_version = (version or VERSION_LATEST).removeprefix("v")
manifest_url = manifest_url or DEFAULT_MANIFEST_URL.format(
source_name=source_name,
version=cleaned_version,
)

response = requests.get(
url=manifest_url,
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
)
response.raise_for_status() # Raise HTTPError exception if the download failed
try:
# First try the new URL format (language='manifest-only'):
result_1 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/manifest.yaml"
),
)
except HTTPError as ex_1:
# If the new URL path was not found, try the old URL format (language='low-code'):
try:
result_2 = _try_get_source_manifest(
source_name=source_name,
manifest_url=(
f"https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-integrations"
f"/connectors/{source_name}/{source_name.replace('-', '_')}/manifest.yaml"
),
)
except HTTPError:
# Raise the first exception, since that represents the new default URL
raise ex_1 from None
else:
# Old URL path was found (no exceptions raised).
return result_2
else:
# New URL path was found (no exceptions raised).
return result_1
return cast(dict, yaml.safe_load(response.text))
except yaml.YAMLError as ex:
raise exc.AirbyteConnectorInstallationError(
message="Failed to parse the connector manifest YAML.",
connector_name=source_name,
context={
"manifest_url": manifest_url,
},
) from ex


def _get_local_executor(
Expand Down
3 changes: 2 additions & 1 deletion airbyte/sources/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe
registry_url = _get_registry_url()
if registry_url.startswith("http"):
response = requests.get(
registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"}
registry_url,
headers={"User-Agent": f"PyAirbyte/{get_version()}"},
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
)
response.raise_for_status()
data = response.json()
Expand Down
Loading