Skip to content

Commit

Permalink
Feat: CatalogManager refactor, decoupling sql processors, caches, and…
Browse files Browse the repository at this point in the history
… internal sql backend tables (#220)
  • Loading branch information
aaronsteers authored May 15, 2024
1 parent 9564b4c commit e82d37c
Show file tree
Hide file tree
Showing 42 changed files with 1,546 additions and 940 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/poetry-lock-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
git config --global user.name "octavia-squidington-iii"
git config --global user.email "[email protected]"
git add .
git commit -m "Auto-fix lint and format issues"
git commit -m "Auto-commit `poetry lock` changes"
- name: Push changes to '(${{ steps.pr-info.outputs.repo }})'
if: steps.git-diff.outputs.changes == 'true'
Expand Down
17 changes: 11 additions & 6 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ jobs:
run: poetry install

- name: Run Pytest with Coverage (Fast Tests Only)
timeout-minutes: 60
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: >
poetry run coverage run -m pytest -m
"not slow and not requires_creds and not linting"
poetry run coverage run -m pytest
--durations=5 --exitfirst
-m "not slow and not requires_creds and not linting"
- name: Print Coverage Report
if: always()
Expand Down Expand Up @@ -85,12 +86,14 @@ jobs:

# Job-specific step(s):
- name: Run Pytest (No-Creds)
timeout-minutes: 60
env:
# Force this to a blank value.
GCP_GSM_CREDENTIALS: ""
run: >
poetry run coverage run -m pytest -m
"not requires_creds and not linting and not super_slow"
poetry run coverage run -m pytest
--durations=5 --exitfirst
-m "not requires_creds and not linting and not super_slow"
- name: Print Coverage Report
if: always()
Expand Down Expand Up @@ -147,11 +150,13 @@ jobs:

# Job-specific step(s):
- name: Run Pytest
timeout-minutes: 60
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: >
poetry run coverage run -m pytest -m
"not linting and not super_slow"
poetry run coverage run -m pytest
--verbose
-m "not linting and not super_slow"
- name: Print Coverage Report
if: always()
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/test-pr-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ jobs:
run: poetry install

- name: Run Pytest
timeout-minutes: 60
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: poetry run pytest -m "not super_slow"
run: >
poetry run pytest
--verbose
-m "not super_slow"
log-success-comment:
name: Append 'Success' Comment
Expand Down
16 changes: 16 additions & 0 deletions airbyte/_future_cdk/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Module for future CDK components.
Components here are planned to move to the CDK.
TODO!: Add GitHub link here before merging.
"""

from __future__ import annotations

from airbyte._future_cdk.sql_processor import SqlProcessorBase


__all__ = [
"SqlProcessorBase",
]
100 changes: 100 additions & 0 deletions airbyte/_future_cdk/catalog_providers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Catalog provider implementation.
A catalog provider wraps a configured catalog and configured streams. This class is responsible for
providing information about the catalog and streams. A catalog provider can also be updated with new
streams as they are discovered, providing a thin layer of abstraction over the configured catalog.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, final

from airbyte import exceptions as exc


if TYPE_CHECKING:
from airbyte_protocol.models import (
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
)


class CatalogProvider:
"""A catalog provider wraps a configured catalog and configured streams.
This class is responsible for providing information about the catalog and streams.
Note:
- The catalog provider is not responsible for managing the catalog or streams but it may
be updated with new streams as they are discovered.
"""

def __init__(
self,
configured_catalog: ConfiguredAirbyteCatalog,
) -> None:
"""Initialize the catalog manager with a catalog object reference.
Since the catalog is passed by reference, the catalog manager may be updated with new
streams as they are discovered.
"""
self._catalog: ConfiguredAirbyteCatalog = configured_catalog

@property
def configured_catalog(self) -> ConfiguredAirbyteCatalog:
return self._catalog

@property
def stream_names(self) -> list[str]:
return list({stream.stream.name for stream in self.configured_catalog.streams})

def get_configured_stream_info(
self,
stream_name: str,
) -> ConfiguredAirbyteStream:
"""Return the column definitions for the given stream."""
if not self.configured_catalog:
raise exc.PyAirbyteInternalError(
message="Cannot get stream JSON schema without a catalog.",
)

matching_streams: list[ConfiguredAirbyteStream] = [
stream
for stream in self.configured_catalog.streams
if stream.stream.name == stream_name
]
if not matching_streams:
raise exc.AirbyteStreamNotFoundError(
stream_name=stream_name,
context={
"available_streams": [
stream.stream.name for stream in self.configured_catalog.streams
],
},
)

if len(matching_streams) > 1:
raise exc.PyAirbyteInternalError(
message="Multiple streams found with same name.",
context={
"stream_name": stream_name,
},
)

return matching_streams[0]

@final
def get_stream_json_schema(
self,
stream_name: str,
) -> dict[str, Any]:
"""Return the column definitions for the given stream."""
return self.get_configured_stream_info(stream_name).stream.json_schema

def get_stream_properties(
self,
stream_name: str,
) -> dict[str, dict]:
"""Return the names of the top-level properties for the given stream."""
return self.get_stream_json_schema(stream_name)["properties"]
Loading

0 comments on commit e82d37c

Please sign in to comment.