Skip to content

Commit

Permalink
Chore: Configure warnings as errors in pytest (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored May 22, 2024
1 parent bcfed96 commit bd8ee6d
Show file tree
Hide file tree
Showing 20 changed files with 311 additions and 186 deletions.
13 changes: 12 additions & 1 deletion airbyte/_processors/sql/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Optional, final

Expand Down Expand Up @@ -56,7 +57,17 @@ def dataset_name(self) -> str:

@overrides
def get_sql_alchemy_url(self) -> SecretString:
"""Return the SQLAlchemy URL to use."""
"""Return the SQLAlchemy URL to use.
We suppress warnings about unrecognized JSON type. More info on that here:
- https://github.com/airbytehq/PyAirbyte/issues/254
"""
warnings.filterwarnings(
"ignore",
message="Did not recognize type 'JSON' of column",
category=sqlalchemy.exc.SAWarning,
)

url: URL = make_url(f"bigquery://{self.project_name!s}")
if self.credentials_path:
url = url.update_query_dict({"credentials_path": self.credentials_path})
Expand Down
17 changes: 8 additions & 9 deletions airbyte/_processors/sql/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from textwrap import dedent, indent
from typing import TYPE_CHECKING, Union

from duckdb_engine import DuckDBEngineWarning
from overrides import overrides
from pydantic import Field
from typing_extensions import Literal
Expand All @@ -22,14 +23,6 @@
from sqlalchemy.engine import Engine


# Suppress warnings from DuckDB about reflection on indices.
# https://github.com/Mause/duckdb_engine/issues/905
warnings.filterwarnings(
"ignore",
message="duckdb-engine doesn't yet support reflection on indices",
)


# @dataclass
class DuckDBConfig(SqlConfig):
"""Configuration for DuckDB."""
Expand All @@ -47,7 +40,13 @@ class DuckDBConfig(SqlConfig):
@overrides
def get_sql_alchemy_url(self) -> SecretString:
"""Return the SQLAlchemy URL to use."""
# return f"duckdb:///{self.db_path}?schema={self.schema_name}"
# Suppress warnings from DuckDB about reflection on indices.
# https://github.com/Mause/duckdb_engine/issues/905
warnings.filterwarnings(
"ignore",
message="duckdb-engine doesn't yet support reflection on indices",
category=DuckDBEngineWarning,
)
return SecretString(f"duckdb:///{self.db_path!s}")

@overrides
Expand Down
2 changes: 2 additions & 0 deletions airbyte/_processors/sql/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import warnings
from typing import TYPE_CHECKING

from duckdb_engine import DuckDBEngineWarning
from overrides import overrides

from airbyte._processors.file import JsonlWriter
Expand All @@ -21,6 +22,7 @@
warnings.filterwarnings(
"ignore",
message="duckdb-engine doesn't yet support reflection on indices",
category=DuckDBEngineWarning,
)


Expand Down
26 changes: 24 additions & 2 deletions airbyte/_util/temp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import json
import tempfile
import time
import warnings
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -20,18 +22,38 @@ def as_temp_files(files_contents: list[dict | str]) -> Generator[list[str], Any,
temp_files: list[Any] = []
try:
for content in files_contents:
use_json = isinstance(content, dict)
temp_file = tempfile.NamedTemporaryFile(
mode="w+t",
delete=False,
encoding="utf-8",
suffix=".json" if use_json else ".txt",
)
temp_file.write(
json.dumps(content) if isinstance(content, dict) else content,
)
temp_file.flush()
# Don't close the file yet (breaks Windows)
# temp_file.close()
temp_files.append(temp_file)
yield [file.name for file in temp_files]
finally:
for temp_file in temp_files:
with suppress(Exception):
Path(temp_file.name).unlink()
max_attempts = 5
for attempt in range(max_attempts):
try:
with suppress(Exception):
temp_file.close()

Path(temp_file.name).unlink(missing_ok=True)

break # File was deleted successfully. Move on.
except Exception as ex:
if attempt < max_attempts - 1:
time.sleep(1) # File might not be closed yet. Wait and try again.
else:
# Something went wrong and the file could not be deleted. Warn the user.
warnings.warn(
f"Failed to remove temporary file: '{temp_file.name}'. {ex}",
stacklevel=2,
)
2 changes: 2 additions & 0 deletions airbyte/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import warnings

from duckdb_engine import DuckDBEngineWarning
from pydantic import PrivateAttr

from airbyte._processors.sql.duckdb import DuckDBConfig, DuckDBSqlProcessor
Expand All @@ -29,6 +30,7 @@
warnings.filterwarnings(
"ignore",
message="duckdb-engine doesn't yet support reflection on indices",
category=DuckDBEngineWarning,
)


Expand Down
11 changes: 11 additions & 0 deletions airbyte/caches/motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

from __future__ import annotations

import warnings

from duckdb_engine import DuckDBEngineWarning
from overrides import overrides
from pydantic import Field, PrivateAttr

Expand All @@ -35,6 +38,14 @@ class MotherDuckConfig(DuckDBConfig):
@overrides
def get_sql_alchemy_url(self) -> SecretString:
"""Return the SQLAlchemy URL to use."""
# Suppress warnings from DuckDB about reflection on indices.
# https://github.com/Mause/duckdb_engine/issues/905
warnings.filterwarnings(
"ignore",
message="duckdb-engine doesn't yet support reflection on indices",
category=DuckDBEngineWarning,
)

return SecretString(
f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
# f"&schema={self.schema_name}" # TODO: Debug why this doesn't work
Expand Down
3 changes: 2 additions & 1 deletion airbyte/cloud/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import warnings

from airbyte import exceptions as exc
from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection
from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace

Expand All @@ -35,7 +36,7 @@
# explicitly imports it.
warnings.warn(
message="The `airbyte.cloud.experimental` module is experimental and may change in the future.",
category=FutureWarning,
category=exc.AirbyteExperimentalFeatureWarning,
stacklevel=2,
)

Expand Down
10 changes: 9 additions & 1 deletion airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,16 @@ def get_dataset(self, stream_name: str) -> CachedDataset:
"""Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the `CachedDataset` object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
"""
return CachedDataset(self.get_sql_cache(), stream_name=stream_name)
return CachedDataset(
self.get_sql_cache(),
stream_name=stream_name,
stream_configuration=False, # Don't look for stream configuration in cache.
)

def get_sql_database_name(self) -> str:
"""Return the SQL database name."""
Expand Down
8 changes: 8 additions & 0 deletions airbyte/datasets/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,17 @@ def __init__(
self,
cache: CacheBase,
stream_name: str,
stream_configuration: ConfiguredAirbyteStream | None | Literal[False] = None,
) -> None:
"""We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
If stream_configuration is None, we attempt to retrieve the stream configuration from the
cache processor. This is useful when constructing a dataset from a CachedDataset object,
which already has the stream configuration.
If stream_configuration is set to False, we skip the stream configuration retrieval.
"""
table_name = cache.processor.get_sql_table_name(stream_name)
schema_name = cache.schema_name
Expand All @@ -151,6 +158,7 @@ def __init__(
cache=cache,
stream_name=stream_name,
query_statement=query,
stream_configuration=stream_configuration,
)

@overrides
Expand Down
7 changes: 7 additions & 0 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,10 @@ class AirbyteMultipleResourcesError(AirbyteError):

resource_type: str | None = None
resource_name_or_id: str | None = None


# Custom Warnings


class AirbyteExperimentalFeatureWarning(FutureWarning):
"""Warning whenever using experimental features in PyAirbyte."""
34 changes: 21 additions & 13 deletions airbyte/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def full_tests(connector_name: str, sample_config: str) -> None:
source = ab.get_source(
# TODO: FIXME: noqa: SIM115, PTH123
connector_name,
config=json.load(open(sample_config, encoding="utf-8")), # noqa: SIM115, PTH123,
config=json.loads(Path(sample_config).read_text(encoding="utf-8")), # ,
install_if_missing=False,
)

Expand Down Expand Up @@ -118,8 +118,7 @@ def run() -> None:
def validate(connector_dir: str, sample_config: str, *, validate_install_only: bool) -> None:
# read metadata.yaml
metadata_path = Path(connector_dir) / "metadata.yaml"
with Path(metadata_path).open(encoding="utf-8") as stream:
metadata = yaml.safe_load(stream)["data"]
metadata = yaml.safe_load(Path(metadata_path).read_text(encoding="utf-8"))["data"]

# TODO: Use remoteRegistries.pypi.packageName once set for connectors
connector_name = metadata["dockerRepository"].replace("airbyte/", "")
Expand Down Expand Up @@ -147,15 +146,24 @@ def validate(connector_dir: str, sample_config: str, *, validate_install_only: b
],
}

with tempfile.NamedTemporaryFile(mode="w+t", delete=True, encoding="utf-8") as temp_file:
with tempfile.NamedTemporaryFile(
mode="w+t", delete=True, encoding="utf-8", suffix="-catalog.json"
) as temp_file:
temp_file.write(json.dumps(registry))
temp_file.seek(0)
temp_file.flush()
os.environ["AIRBYTE_LOCAL_REGISTRY"] = str(temp_file.name)
if validate_install_only:
install_only_test(connector_name)
else:
if not sample_config:
raise exc.PyAirbyteInputError(
input_value="--sample-config is required without --validate-install-only set"
)
full_tests(connector_name, sample_config)
try:
if validate_install_only:
install_only_test(connector_name)
else:
if not sample_config:
raise exc.PyAirbyteInputError(
input_value=(
"`--sample-config` is required when `--validate-install-only`"
"is not set."
)
)
full_tests(connector_name, sample_config)
finally:
del os.environ["AIRBYTE_LOCAL_REGISTRY"]
temp_file.close()
Loading

0 comments on commit bd8ee6d

Please sign in to comment.