Skip to content

Commit

Permalink
Chore: Bump airbyte-api (#186)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Apr 12, 2024
1 parent a1a3909 commit 6c9dab8
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 96 deletions.
42 changes: 42 additions & 0 deletions airbyte/_util/api_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Imported classes from the Airbyte API.
Any classes that are imported from the Airbyte API should be imported here.
This allows for easy access to these classes in other modules, especially
for type hinting purposes.
Design Guidelines:
- No modules except `api_util` and `api_imports` should import from `airbyte_api`.
- If a module needs to import from `airbyte_api`, it should import from `api_imports` (this module)
instead.
- This module is divided into two sections: internal-use classes and public-use classes.
- Public-use classes should be carefully reviewed to ensure that they are necessary for public use
and that we are willing to support them as part of PyAirbyte.
"""
# Ignore import sorting in this file. Manual grouping is more important.
# ruff: noqa: I001

from __future__ import annotations

# Internal-Use Classes

# These classes are used internally to cache API responses.
from airbyte_api.models import (
ConnectionResponse,
DestinationResponse,
JobResponse,
)

# Public-Use Classes

# This class is used to represent the status of a job. It may be used in
# type hints for public functions that return a job status.
from airbyte_api.models import JobStatusEnum # Alias not needed


__all__: list[str] = [
"ConnectionResponse",
"DestinationResponse",
"JobResponse",
"JobStatusEnum",
]
122 changes: 59 additions & 63 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
from typing import Any

import airbyte_api
from airbyte_api.models import operations as api_operations
from airbyte_api.models import shared as api_models
from airbyte_api.models.shared.jobcreaterequest import JobCreateRequest, JobTypeEnum
from airbyte_api import api, models

from airbyte.exceptions import (
AirbyteConnectionSyncError,
Expand Down Expand Up @@ -47,8 +45,8 @@ def get_airbyte_server_instance(
api_root: str,
) -> airbyte_api.Airbyte:
"""Get an Airbyte instance."""
return airbyte_api.Airbyte(
security=api_models.Security(
return airbyte_api.AirbyteAPI(
security=models.Security(
bearer_auth=api_key,
),
server_url=api_root,
Expand All @@ -63,14 +61,14 @@ def get_workspace(
*,
api_root: str,
api_key: str,
) -> api_models.WorkspaceResponse:
) -> models.WorkspaceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.workspaces.get_workspace(
api_operations.GetWorkspaceRequest(
api.GetWorkspaceRequest(
workspace_id=workspace_id,
),
)
Expand All @@ -94,15 +92,15 @@ def list_connections(
*,
api_root: str,
api_key: str,
) -> list[api_models.ConnectionResponse]:
) -> list[api.ConnectionResponse]:
"""Get a connection."""
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api_operations.ListConnectionsRequest()(
api.ListConnectionsRequest()(
workspace_ids=[workspace_id],
),
)
Expand All @@ -124,15 +122,15 @@ def get_connection(
*,
api_root: str,
api_key: str,
) -> api_models.ConnectionResponse:
) -> api.ConnectionResponse:
"""Get a connection."""
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.connections.get_connection(
api_operations.GetConnectionRequest(
api.GetConnectionRequest(
connection_id=connection_id,
),
)
Expand All @@ -148,7 +146,7 @@ def run_connection(
*,
api_root: str,
api_key: str,
) -> api_models.ConnectionResponse:
) -> api.ConnectionResponse:
"""Get a connection.
If block is True, this will block until the connection is finished running.
Expand All @@ -161,9 +159,9 @@ def run_connection(
api_root=api_root,
)
response = airbyte_instance.jobs.create_job(
JobCreateRequest(
models.JobCreateRequest(
connection_id=connection_id,
job_type=JobTypeEnum.SYNC,
job_type=models.JobTypeEnum.SYNC,
),
)
if status_ok(response.status_code) and response.job_response:
Expand All @@ -188,14 +186,14 @@ def get_job_logs(
*,
api_root: str,
api_key: str,
) -> list[api_models.JobResponse]:
) -> list[api.JobResponse]:
"""Get a job's logs."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response: api_operations.ListJobsResponse = airbyte_instance.jobs.list_jobs(
api_operations.ListJobsRequest(
response: api.ListJobsResponse = airbyte_instance.jobs.list_jobs(
api.ListJobsRequest(
workspace_ids=[workspace_id],
connection_id=connection_id,
limit=limit,
Expand All @@ -219,14 +217,14 @@ def get_job_info(
*,
api_root: str,
api_key: str,
) -> api_models.JobResponse:
) -> api.JobResponse:
"""Get a job."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.jobs.get_job(
api_operations.GetJobRequest(
api.GetJobRequest(
job_id=job_id,
),
)
Expand All @@ -246,14 +244,14 @@ def create_source(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api_models.SourceResponse:
) -> api.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response: api_operations.CreateSourceResponse = airbyte_instance.sources.create_source(
api_models.SourceCreateRequest(
response: api.CreateSourceResponse = airbyte_instance.sources.create_source(
models.SourceCreateRequest(
name=name,
workspace_id=workspace_id,
configuration=config, # TODO: wrap in a proper configuration object
Expand All @@ -275,14 +273,14 @@ def get_source(
*,
api_root: str,
api_key: str,
) -> api_models.SourceResponse:
) -> api.SourceResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.sources.get_source(
api_operations.GetSourceRequest(
api.GetSourceRequest(
source_id=source_id,
),
)
Expand All @@ -306,7 +304,7 @@ def delete_source(
api_root=api_root,
)
response = airbyte_instance.sources.delete_source(
api_operations.DeleteSourceRequest(
api.DeleteSourceRequest(
source_id=source_id,
),
)
Expand All @@ -329,20 +327,18 @@ def create_destination(
config: dict[str, Any],
api_root: str,
api_key: str,
) -> api_models.DestinationResponse:
) -> api.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response: api_operations.CreateDestinationResponse = (
airbyte_instance.destinations.create_destination(
api_models.DestinationCreateRequest(
name=name,
workspace_id=workspace_id,
configuration=config, # TODO: wrap in a proper configuration object
),
)
response: api.CreateDestinationResponse = airbyte_instance.destinations.create_destination(
models.DestinationCreateRequest(
name=name,
workspace_id=workspace_id,
configuration=config, # TODO: wrap in a proper configuration object
),
)
if status_ok(response.status_code) and response.destination_response:
return response.destination_response
Expand All @@ -358,14 +354,14 @@ def get_destination(
*,
api_root: str,
api_key: str,
) -> api_models.DestinationResponse:
) -> api.DestinationResponse:
"""Get a connection."""
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
response = airbyte_instance.destinations.get_destination(
api_operations.GetDestinationRequest(
api.GetDestinationRequest(
destination_id=destination_id,
),
)
Expand All @@ -376,19 +372,19 @@ def get_destination(
raw_configuration: dict[str, Any] = raw_response["configuration"]
destination_type = raw_response.get("destinationType")
if destination_type == "snowflake":
response.destination_response.configuration = api_models.DestinationSnowflake.from_dict(
response.destination_response.configuration = models.DestinationSnowflake.from_dict(
raw_configuration,
)
if destination_type == "bigquery":
response.destination_response.configuration = api_models.DestinationBigquery.from_dict(
response.destination_response.configuration = models.DestinationBigquery.from_dict(
raw_configuration,
)
if destination_type == "postgres":
response.destination_response.configuration = api_models.DestinationPostgres.from_dict(
response.destination_response.configuration = models.DestinationPostgres.from_dict(
raw_configuration,
)
if destination_type == "duckdb":
response.destination_response.configuration = api_models.DestinationDuckdb.from_dict(
response.destination_response.configuration = models.DestinationDuckdb.from_dict(
raw_configuration,
)

Expand All @@ -411,7 +407,7 @@ def delete_destination(
api_root=api_root,
)
response = airbyte_instance.destinations.delete_destination(
api_operations.DeleteDestinationRequest(
api.DeleteDestinationRequest(
destination_id=destination_id,
),
)
Expand All @@ -437,23 +433,23 @@ def create_connection(
workspace_id: str | None = None,
prefix: str,
selected_stream_names: list[str],
) -> api_models.ConnectionResponse:
) -> models.ConnectionResponse:
_ = workspace_id # Not used (yet)
airbyte_instance = get_airbyte_server_instance(
api_key=api_key,
api_root=api_root,
)
stream_configurations: list[api_models.StreamConfiguration] = []
stream_configurations: list[models.StreamConfiguration] = []
if selected_stream_names:
for stream_name in selected_stream_names:
stream_configuration = api_models.StreamConfiguration(
stream_configuration = models.StreamConfiguration(
name=stream_name,
)
stream_configurations.append(stream_configuration)

stream_configurations = api_models.StreamConfigurations(stream_configurations)
stream_configurations = models.StreamConfigurations(stream_configurations)
response = airbyte_instance.connections.create_connection(
api_models.ConnectionCreateRequest(
models.ConnectionCreateRequest(
name=name,
source_id=source_id,
destination_id=destination_id,
Expand All @@ -479,14 +475,14 @@ def get_connection_by_name(
*,
api_root: str,
api_key: str,
) -> api_models.ConnectionResponse:
) -> models.ConnectionResponse:
"""Get a connection."""
connections = list_connections(
workspace_id=workspace_id,
api_key=api_key,
api_root=api_root,
)
found: list[api_models.ConnectionResponse] = [
found: list[models.ConnectionResponse] = [
connection for connection in connections if connection.name == connection_name
]
if len(found) == 0:
Expand Down Expand Up @@ -519,7 +515,7 @@ def delete_connection(
api_root=api_root,
)
response = airbyte_instance.connections.delete_connection(
api_operations.DeleteConnectionRequest(
api.DeleteConnectionRequest(
connection_id=connection_id,
),
)
Expand All @@ -535,17 +531,17 @@ def delete_connection(
# Not yet implemented


def check_source(
source_id: str,
*,
api_root: str,
api_key: str,
workspace_id: str | None = None,
) -> api_models.SourceCheckResponse:
"""Check a source.
# TODO: Need to use legacy Configuration API for this:
# https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/sources/check_connection
"""
_ = source_id, workspace_id, api_root, api_key
raise NotImplementedError
# def check_source(
# source_id: str,
# *,
# api_root: str,
# api_key: str,
# workspace_id: str | None = None,
# ) -> api.SourceCheckResponse:
# """Check a source.

# # TODO: Need to use legacy Configuration API for this:
# # https://airbyte-public-api-docs.s3.us-east-2.amazonaws.com/rapidoc-api-docs.html#post-/v1/sources/check_connection
# """
# _ = source_id, workspace_id, api_root, api_key
# raise NotImplementedError
2 changes: 1 addition & 1 deletion airbyte/cloud/_destination_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pathlib import Path
from typing import TYPE_CHECKING, Any

from airbyte_api.models.shared import (
from airbyte_api.models import (
DestinationBigquery,
DestinationDuckdb,
DestinationPostgres,
Expand Down
Loading

0 comments on commit 6c9dab8

Please sign in to comment.