From f83eca58eaf2129d21b5796a301732ab22675130 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Fri, 14 Jan 2022 06:29:34 +0200 Subject: [PATCH] CDK: Fix typing errors (#9037) * fix typing, drop AirbyteLogger * format * bump the version * use logger instead of fixture logger Co-authored-by: Eugene Kulak Co-authored-by: auganbay --- airbyte-cdk/python/CHANGELOG.md | 3 ++ airbyte-cdk/python/airbyte_cdk/connector.py | 6 +-- .../airbyte_cdk/destinations/destination.py | 15 +++--- airbyte-cdk/python/airbyte_cdk/logger.py | 53 +++++-------------- .../airbyte_cdk/sources/abstract_source.py | 24 ++++----- .../python/airbyte_cdk/sources/config.py | 4 +- .../sources/deprecated/base_source.py | 10 ++-- .../sources/singer/singer_helpers.py | 7 +-- .../python/airbyte_cdk/sources/source.py | 6 +-- .../airbyte_cdk/sources/streams/core.py | 4 +- .../airbyte_cdk/sources/streams/http/http.py | 18 ++++--- .../sources/streams/http/rate_limiting.py | 5 +- .../http/requests_native_auth/oauth.py | 2 +- .../airbyte_cdk/sources/utils/__init__.py | 4 ++ .../sources/utils/schema_models.py | 4 +- .../airbyte_cdk/sources/utils/transform.py | 12 ++--- .../python/airbyte_cdk/utils/event_timing.py | 3 +- .../python/airbyte_cdk/utils/mapping_utils.py | 4 +- airbyte-cdk/python/setup.py | 2 +- .../unit_tests/singer/test_singer_source.py | 14 ++--- .../sources/streams/http/test_http.py | 20 +++---- .../sources/test_abstract_source.py | 26 ++++----- .../python/unit_tests/sources/test_source.py | 8 +-- .../python/unit_tests/test_connector.py | 6 +-- airbyte-cdk/python/unit_tests/test_logger.py | 2 +- .../python/unit_tests/test_secure_logger.py | 8 +-- 26 files changed, 126 insertions(+), 144 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index c5aed3c9a8df..4c4f0cf604aa 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.47 +Fix typing errors. + ## 0.1.45 Integrate Sentry for performance and errors tracking. diff --git a/airbyte-cdk/python/airbyte_cdk/connector.py b/airbyte-cdk/python/airbyte_cdk/connector.py index 4e8fa91cb601..f17c76ab5754 100644 --- a/airbyte-cdk/python/airbyte_cdk/connector.py +++ b/airbyte-cdk/python/airbyte_cdk/connector.py @@ -4,12 +4,12 @@ import json +import logging import os import pkgutil from abc import ABC, abstractmethod from typing import Any, Mapping, Optional -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification @@ -48,7 +48,7 @@ def write_config(config: Mapping[str, Any], config_path: str): with open(config_path, "w") as fh: fh.write(json.dumps(config)) - def spec(self, logger: AirbyteLogger) -> ConnectorSpecification: + def spec(self, logger: logging.Logger) -> ConnectorSpecification: """ Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) required to run this integration. @@ -59,7 +59,7 @@ def spec(self, logger: AirbyteLogger) -> ConnectorSpecification: return ConnectorSpecification.parse_obj(json.loads(raw_spec)) @abstractmethod - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API. diff --git a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py index f07500f0bc09..b46123dd0ffc 100644 --- a/airbyte-cdk/python/airbyte_cdk/destinations/destination.py +++ b/airbyte-cdk/python/airbyte_cdk/destinations/destination.py @@ -4,19 +4,20 @@ import argparse import io +import logging import sys from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping -from airbyte_cdk import AirbyteLogger from airbyte_cdk.connector import Connector from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit from pydantic import ValidationError +logger = logging.getLogger("airbyte") + class Destination(Connector, ABC): - logger = AirbyteLogger() VALID_CMDS = {"spec", "check", "write"} @abstractmethod @@ -26,7 +27,7 @@ def write( """Implement to define how the connector writes data to the destination""" def _run_check(self, config: Mapping[str, Any]) -> AirbyteMessage: - check_result = self.check(self.logger, config) + check_result = self.check(logger, config) return AirbyteMessage(type=Type.CONNECTION_STATUS, connectionStatus=check_result) def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[AirbyteMessage]: @@ -35,16 +36,16 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt try: yield AirbyteMessage.parse_raw(line) except ValidationError: - self.logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}") + logger.info(f"ignoring input which can't be deserialized as Airbyte Message: {line}") def _run_write( self, config: Mapping[str, Any], configured_catalog_path: str, input_stream: io.TextIOWrapper ) -> Iterable[AirbyteMessage]: catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path) input_messages = self._parse_input_stream(input_stream) - self.logger.info("Begin writing to the destination...") + logger.info("Begin writing to the destination...") yield from self.write(config=config, configured_catalog=catalog, input_messages=input_messages) - self.logger.info("Writing complete.") + logger.info("Writing complete.") def parse_args(self, args: List[str]) -> argparse.Namespace: """ @@ -86,7 +87,7 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]: if cmd not in self.VALID_CMDS: raise Exception(f"Unrecognized command: {cmd}") - spec = self.spec(self.logger) + spec = self.spec(logger) if cmd == "spec": yield AirbyteMessage(type=Type.SPEC, spec=spec) return diff --git a/airbyte-cdk/python/airbyte_cdk/logger.py b/airbyte-cdk/python/airbyte_cdk/logger.py index 5b35f36478b4..1cfb72175a62 100644 --- a/airbyte-cdk/python/airbyte_cdk/logger.py +++ b/airbyte-cdk/python/airbyte_cdk/logger.py @@ -6,7 +6,7 @@ import logging.config import sys import traceback -from typing import List +from typing import List, Tuple from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage @@ -49,7 +49,6 @@ def hook_fn(exception_type, exception_value, traceback_): def init_logger(name: str = None): """Initial set up of logger""" - logging.setLoggerClass(AirbyteNativeLogger) logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") logger = logging.getLogger(name) logger.setLevel(TRACE_LEVEL_NUM) @@ -61,7 +60,7 @@ def init_logger(name: str = None): class AirbyteLogFormatter(logging.Formatter): """Output log records using AirbyteMessage""" - _secrets = [] + _secrets: List[str] = [] @classmethod def update_secrets(cls, secrets: List[str]): @@ -88,46 +87,22 @@ def format(self, record: logging.LogRecord) -> str: return log_message.json(exclude_unset=True) -class AirbyteNativeLogger(logging.Logger): - """Using native logger with implementing all AirbyteLogger features""" +def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]: + """Custom method, which takes log level from first word of message""" + valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"] + split_line = msg.split() + first_word = next(iter(split_line), None) + if first_word in valid_log_types: + log_level = logging.getLevelName(first_word) + rendered_message = " ".join(split_line[1:]) + else: + log_level = logging.getLevelName(default_level) + rendered_message = msg - def __init__(self, name): - super().__init__(name) - self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"] - - def log_by_prefix(self, msg, default_level): - """Custom method, which takes log level from first word of message""" - split_line = msg.split() - first_word = next(iter(split_line), None) - if first_word in self.valid_log_types: - log_level = logging.getLevelName(first_word) - rendered_message = " ".join(split_line[1:]) - else: - default_level = default_level if default_level in self.valid_log_types else "INFO" - log_level = logging.getLevelName(default_level) - rendered_message = msg - self.log(log_level, rendered_message) - - def trace(self, msg, *args, **kwargs): - self._log(TRACE_LEVEL_NUM, msg, args, **kwargs) + return log_level, rendered_message class AirbyteLogger: - def __init__(self): - self.valid_log_types = ["FATAL", "ERROR", "WARN", "INFO", "DEBUG", "TRACE"] - - def log_by_prefix(self, message, default_level): - """Custom method, which takes log level from first word of message""" - split_line = message.split() - first_word = next(iter(split_line), None) - if first_word in self.valid_log_types: - log_level = first_word - rendered_message = " ".join(split_line[1:]) - else: - log_level = default_level - rendered_message = message - self.log(log_level, rendered_message) - def log(self, level, message): log_record = AirbyteLogMessage(level=level, message=message) log_message = AirbyteMessage(type="LOG", log=log_record) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 706536ce9e6a..d2e81e99b350 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -4,19 +4,18 @@ import copy +import logging from abc import ABC, abstractmethod from datetime import datetime from functools import lru_cache from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, - AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Status, @@ -38,8 +37,9 @@ class AbstractSource(Source, ABC): """ @abstractmethod - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: """ + :param logger: source logger :param config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc. :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful @@ -57,19 +57,19 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ # Stream name to instance map for applying output object transformation - _stream_to_instance_map: Dict[str, AirbyteStream] = {} + _stream_to_instance_map: Dict[str, Stream] = {} @property def name(self) -> str: """Source name""" return self.__class__.__name__ - def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog: + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: """Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] return AirbyteCatalog(streams=streams) - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" try: check_succeeded, error = self.check_connection(logger, config) @@ -81,7 +81,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn return AirbyteConnectionStatus(status=Status.SUCCEEDED) def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterator[AirbyteMessage]: """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" connector_state = copy.deepcopy(state or {}) @@ -118,7 +118,7 @@ def read( def _read_stream( self, - logger: AirbyteLogger, + logger: logging.Logger, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, connector_state: MutableMapping[str, Any], @@ -160,7 +160,7 @@ def _limit_reached(internal_config: InternalConfig, records_counter: int) -> boo def _read_incremental( self, - logger: AirbyteLogger, + logger: logging.Logger, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, connector_state: MutableMapping[str, Any], @@ -222,7 +222,7 @@ def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state)) @lru_cache(maxsize=None) - def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, dict]: + def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]: """ Lookup stream's transform object and jsonschema based on stream name. This function would be called a lot so using caching to save on costly @@ -230,7 +230,7 @@ def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTran :param stream_name name of stream from catalog. :return tuple with stream transformer object and discover json schema. """ - stream_instance = self._stream_to_instance_map.get(stream_name) + stream_instance = self._stream_to_instance_map[stream_name] return stream_instance.transformer, stream_instance.get_json_schema() def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]): @@ -240,6 +240,6 @@ def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]): # need it to normalize values against json schema. By default no action # taken unless configured. See # docs/connector-development/cdk-python/schemas.md for details. - transformer.transform(data, schema) + transformer.transform(data, schema) # type: ignore message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) return AirbyteMessage(type=MessageType.RECORD, record=message) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/config.py b/airbyte-cdk/python/airbyte_cdk/sources/config.py index 11ff07a9606c..b96672ef86e4 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/config.py @@ -17,9 +17,9 @@ class BaseConfig(BaseModel): """ @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, *args, **kwargs) -> Dict[str, Any]: """We're overriding the schema classmethod to enable some post-processing""" - schema = super().schema(**kwargs) + schema = super().schema(*args, **kwargs) rename_key(schema, old_key="anyOf", new_key="oneOf") # UI supports only oneOf expand_refs(schema) schema.pop("description", None) # description added from the docstring diff --git a/airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py b/airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py index 617f0378f924..c1dd61c45d11 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/deprecated/base_source.py @@ -4,10 +4,10 @@ import copy +import logging from datetime import datetime from typing import Any, Iterable, Mapping, MutableMapping, Type -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, @@ -39,13 +39,13 @@ def _get_client(self, config: Mapping): """Construct client""" return self.client_class(**config) - def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog: + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: """Discover streams""" client = self._get_client(config) return AirbyteCatalog(streams=[stream for stream in client.streams]) - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: """Check connection""" client = self._get_client(config) alive, error = client.health_check() @@ -55,7 +55,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn return AirbyteConnectionStatus(status=Status.SUCCEEDED) def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterable[AirbyteMessage]: state = state or {} client = self._get_client(config) @@ -73,7 +73,7 @@ def read( logger.info(f"Finished syncing {self.name}") def _read_stream( - self, logger: AirbyteLogger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any] + self, logger: logging.Logger, client: BaseClient, configured_stream: ConfiguredAirbyteStream, state: MutableMapping[str, Any] ): stream_name = configured_stream.stream.name use_incremental = configured_stream.sync_mode == SyncMode.incremental and client.stream_has_state(stream_name) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py index 23d464824098..d279e3d7d300 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/singer/singer_helpers.py @@ -12,6 +12,7 @@ from io import TextIOWrapper from typing import Any, DefaultDict, Dict, Iterator, List, Mapping, Optional, Tuple +from airbyte_cdk.logger import log_by_prefix from airbyte_cdk.models import ( AirbyteCatalog, AirbyteMessage, @@ -138,7 +139,7 @@ def _read_singer_catalog(logger, shell_command: str) -> Mapping[str, Any]: shell_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) for line in completed_process.stderr.splitlines(): - logger.log_by_prefix(line, "ERROR") + logger.log(*log_by_prefix(line, "ERROR")) return json.loads(completed_process.stdout) @@ -169,9 +170,9 @@ def read(logger, shell_command, is_message=(lambda x: True)) -> Iterator[Airbyte if message_data is not None: yield message_data else: - logger.log_by_prefix(line, "INFO") + logger.log(*log_by_prefix(line, "INFO")) else: - logger.log_by_prefix(line, "ERROR") + logger.log(*log_by_prefix(line, "ERROR")) @staticmethod def _read_lines(process: subprocess.Popen) -> Iterator[Tuple[str, TextIOWrapper]]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/source.py b/airbyte-cdk/python/airbyte_cdk/sources/source.py index 81e1eac7aaa2..5e0396b3fcbd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/source.py @@ -4,12 +4,12 @@ import json +import logging from abc import ABC, abstractmethod from collections import defaultdict from typing import Any, Dict, Iterable, Mapping, MutableMapping from airbyte_cdk.connector import Connector -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteCatalog, AirbyteMessage, ConfiguredAirbyteCatalog @@ -29,14 +29,14 @@ def read_catalog(self, catalog_path: str) -> ConfiguredAirbyteCatalog: @abstractmethod def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ) -> Iterable[AirbyteMessage]: """ Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state. """ @abstractmethod - def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog: + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: """ Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 22823cd29d2b..9aea6d7d1508 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -110,11 +110,13 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: """ def stream_slices( - self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: """ Override to define the slices for this stream. See the stream slicing section of the docs for more information. + :param sync_mode: + :param cursor_field: :param stream_state: :return: """ diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 1abab94799f5..f2b1f0659763 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -33,13 +33,13 @@ class HttpStream(Stream, ABC): """ source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table) - page_size = None # Use this variable to define page size for API http requests with pagination support + page_size: Optional[int] = None # Use this variable to define page size for API http requests with pagination support # TODO: remove legacy HttpAuthenticator authenticator references def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None): self._session = requests.Session() - self._authenticator = NoAuth() + self._authenticator: HttpAuthenticator = NoAuth() if isinstance(authenticator, AuthBase): self._session.auth = authenticator elif authenticator: @@ -107,7 +107,7 @@ def max_retries(self) -> Union[int, None]: return 5 @property - def retry_factor(self) -> int: + def retry_factor(self) -> float: """ Override if needed. Specifies factor for backoff policy. """ @@ -130,6 +130,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, @abstractmethod def path( self, + *, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, @@ -206,6 +207,7 @@ def request_kwargs( def parse_response( self, response: requests.Response, + *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, @@ -214,6 +216,9 @@ def parse_response( Parses the raw response object into a list of records. By default, this returns an iterable containing the input. Override to parse differently. :param response: + :param stream_state: + :param stream_slice: + :param next_page_token: :return: An iterable containing the parsed response """ @@ -236,6 +241,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: This method is called only if should_backoff() returns True for the input request. + :param response: :return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff to the default backoff behavior (e.g using an exponential algorithm). """ @@ -310,11 +316,11 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi max_tries: The maximum number of attempts to make before giving up ...The default value of None means there is no limit to the number of tries. - This implies that if max_tries is excplicitly set to None there is no + This implies that if max_tries is explicitly set to None there is no limit to retry attempts, otherwise it is limited number of tries. But this is not true for current version of backoff packages (1.8.0). Setting - max_tries to 0 or negative number would result in endless retry atempts. - Add this condition to avoid an endless loop if it hasnt been set + max_tries to 0 or negative number would result in endless retry attempts. + Add this condition to avoid an endless loop if it hasn't been set explicitly (i.e. max_retries is not None). """ if max_tries is not None: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py index ab7cbba741cf..2401e51005d5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/rate_limiting.py @@ -5,6 +5,7 @@ import sys import time +from typing import Optional import backoff from airbyte_cdk.logger import AirbyteLogger @@ -18,7 +19,7 @@ logger = AirbyteLogger() -def default_backoff_handler(max_tries: int, factor: int, **kwargs): +def default_backoff_handler(max_tries: Optional[int], factor: float, **kwargs): def log_retry_attempt(details): _, exc, _ = sys.exc_info() if exc.response: @@ -46,7 +47,7 @@ def should_give_up(exc): ) -def user_defined_backoff_handler(max_tries: int, **kwargs): +def user_defined_backoff_handler(max_tries: Optional[int], **kwargs): def sleep_on_ratelimit(details): _, exc, _ = sys.exc_info() if isinstance(exc, UserDefinedBackoffException): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py index 39c65950a7ae..a77fa5c73049 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/requests_native_auth/oauth.py @@ -23,7 +23,7 @@ def __init__( client_secret: str, refresh_token: str, scopes: List[str] = None, - token_expiry_date: pendulum.datetime = None, + token_expiry_date: pendulum.DateTime = None, access_token_name: str = "access_token", expires_in_name: str = "expires_in", ): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/__init__.py index acfd1708ebe4..5adf292dff0c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/__init__.py @@ -1 +1,5 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + # Initialize Utils Package diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py index d2c0c23fec50..7a3696493703 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_models.py @@ -77,8 +77,8 @@ def schema_extra(cls, schema: Dict[str, Any], model: Type[BaseModel]) -> None: prop["oneOf"] = [{"type": "null"}, {"$ref": ref}] @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, *args, **kwargs) -> Dict[str, Any]: """We're overriding the schema classmethod to enable some post-processing""" - schema = super().schema(**kwargs) + schema = super().schema(*args, **kwargs) expand_refs(schema) return schema diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py index fa14eb531f04..ed974ef1305b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/transform.py @@ -4,7 +4,7 @@ from distutils.util import strtobool from enum import Flag, auto -from typing import Any, Callable, Dict +from typing import Any, Callable, Dict, Mapping, Optional from airbyte_cdk.logger import AirbyteLogger from jsonschema import Draft7Validator, validators @@ -36,7 +36,7 @@ class TypeTransformer: Class for transforming object before output. """ - _custom_normalizer: Callable[[Any, Dict[str, Any]], Any] = None + _custom_normalizer: Optional[Callable[[Any, Dict[str, Any]], Any]] = None def __init__(self, config: TransformConfig): """ @@ -90,7 +90,7 @@ def default_convert(original_item: Any, subschema: Dict[str, Any]) -> Any: :param subschema part of the jsonschema containing field type/format data. :return transformed field value. """ - target_type = subschema.get("type") + target_type = subschema.get("type", []) if original_item is None and "null" in target_type: return None if isinstance(target_type, list): @@ -160,11 +160,11 @@ def resolve(subschema): return normalizator - def transform(self, record: Dict[str, Any], schema: Dict[str, Any]): + def transform(self, record: Dict[str, Any], schema: Mapping[str, Any]): """ Normalize and validate according to config. - :param record record instance for normalization/transformation. All modification are done by modifing existent object. - :schema object's jsonschema for normalization. + :param record: record instance for normalization/transformation. All modification are done by modifying existent object. + :param schema: object's jsonschema for normalization. """ if TransformConfig.NoTransform in self._config: return diff --git a/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py b/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py index 8d31dc80d0a3..25983c42c71a 100644 --- a/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py +++ b/airbyte-cdk/python/airbyte_cdk/utils/event_timing.py @@ -6,6 +6,7 @@ import time from contextlib import contextmanager from dataclasses import dataclass, field +from typing import Optional from airbyte_cdk.logger import AirbyteLogger @@ -60,7 +61,7 @@ def report(self, order_by="name"): class Event: name: str start: float = field(default_factory=time.perf_counter_ns) - end: float = field(default=None) + end: Optional[float] = field(default=None) @property def duration(self) -> float: diff --git a/airbyte-cdk/python/airbyte_cdk/utils/mapping_utils.py b/airbyte-cdk/python/airbyte_cdk/utils/mapping_utils.py index c2d5c85149d7..c618316afea4 100644 --- a/airbyte-cdk/python/airbyte_cdk/utils/mapping_utils.py +++ b/airbyte-cdk/python/airbyte_cdk/utils/mapping_utils.py @@ -3,7 +3,7 @@ # from functools import reduce -from typing import Any, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional, Tuple def all_key_pairs_dot_notation(dict_obj: Mapping) -> Mapping[str, Any]: @@ -12,7 +12,7 @@ def all_key_pairs_dot_notation(dict_obj: Mapping) -> Mapping[str, Any]: keys are prefixed with the list of keys passed in as prefix. """ - def _all_key_pairs_dot_notation(_dict_obj: Mapping, prefix: List[str] = []) -> Mapping[str, Any]: + def _all_key_pairs_dot_notation(_dict_obj: Mapping, prefix: List[str] = []) -> Iterable[Tuple[str, Any]]: for key, value in _dict_obj.items(): if isinstance(value, dict): prefix.append(str(key)) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 35fa886a9fa4..baedbfb18d22 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.46", + version="0.1.47", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/singer/test_singer_source.py b/airbyte-cdk/python/unit_tests/singer/test_singer_source.py index 4f65b107341e..319745c3e22b 100644 --- a/airbyte-cdk/python/unit_tests/singer/test_singer_source.py +++ b/airbyte-cdk/python/unit_tests/singer/test_singer_source.py @@ -4,14 +4,14 @@ import copy +import logging from unittest.mock import patch -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import SyncMode from airbyte_cdk.sources.singer import SingerHelper, SyncModeInfo from airbyte_cdk.sources.singer.source import BaseSingerSource, ConfigContainer -LOGGER = AirbyteLogger() +logger = logging.getLogger("airbyte") class TetsBaseSinger(BaseSingerSource): @@ -57,7 +57,7 @@ class TetsBaseSinger(BaseSingerSource): @patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) def test_singer_discover_single_pk(mock_read_catalog): - airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + airbyte_catalog = TetsBaseSinger().discover(logger, ConfigContainer({}, "")) _user_stream = airbyte_catalog.streams[0] _roles_stream = airbyte_catalog.streams[1] assert _user_stream.source_defined_primary_key == [["id"]] @@ -69,7 +69,7 @@ def test_singer_discover_with_composite_pk(): singer_catalog_composite_pk = copy.deepcopy(basic_singer_catalog) singer_catalog_composite_pk["streams"][0]["key_properties"] = ["id", "name"] with patch.object(SingerHelper, "_read_singer_catalog", return_value=singer_catalog_composite_pk): - airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + airbyte_catalog = TetsBaseSinger().discover(logger, ConfigContainer({}, "")) _user_stream = airbyte_catalog.streams[0] _roles_stream = airbyte_catalog.streams[1] @@ -81,7 +81,7 @@ def test_singer_discover_with_composite_pk(): @patch.object(BaseSingerSource, "get_primary_key_overrides", return_value={"users": ["updated_at"]}) @patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) def test_singer_discover_pk_overrides(mock_pk_override, mock_read_catalog): - airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + airbyte_catalog = TetsBaseSinger().discover(logger, ConfigContainer({}, "")) _user_stream = airbyte_catalog.streams[0] _roles_stream = airbyte_catalog.streams[1] assert _user_stream.source_defined_primary_key == [["updated_at"]] @@ -91,7 +91,7 @@ def test_singer_discover_pk_overrides(mock_pk_override, mock_read_catalog): @patch.object(SingerHelper, "_read_singer_catalog", return_value=basic_singer_catalog) def test_singer_discover_metadata(mock_read_catalog): - airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + airbyte_catalog = TetsBaseSinger().discover(logger, ConfigContainer({}, "")) _user_stream = airbyte_catalog.streams[0] _roles_stream = airbyte_catalog.streams[1] @@ -105,7 +105,7 @@ def test_singer_discover_metadata(mock_read_catalog): def test_singer_discover_sync_mode_overrides(mock_read_catalog): sync_mode_override = SyncModeInfo(supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], default_cursor_field=["name"]) with patch.object(BaseSingerSource, "get_sync_mode_overrides", return_value={"roles": sync_mode_override}): - airbyte_catalog = TetsBaseSinger().discover(LOGGER, ConfigContainer({}, "")) + airbyte_catalog = TetsBaseSinger().discover(logger, ConfigContainer({}, "")) _roles_stream = airbyte_catalog.streams[1] assert _roles_stream.supported_sync_modes == sync_mode_override.supported_sync_modes diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index b325733584e5..61d8e0c69d59 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -29,18 +29,10 @@ def __init__(self, **kwargs): def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: + def path(self, **kwargs) -> str: return "" - def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: stubResp = {"data": self.resp_counter} self.resp_counter += 1 yield stubResp @@ -364,10 +356,10 @@ class CacheHttpSubStream(HttpSubStream): def __init__(self, parent): super().__init__(parent=parent) - def parse_response(self, **kwargs) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: return [] - def next_page_token(self, **kwargs) -> Optional[Mapping[str, Any]]: + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None def path(self, **kwargs) -> str: @@ -406,14 +398,14 @@ class CacheHttpStreamWithSlices(CacheHttpStream): paths = ["", "search"] def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: - return f'{stream_slice.get("path")}' + return f'{stream_slice["path"]}' if stream_slice else "" def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: for path in self.paths: yield {"path": path} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - yield response + yield {"value": len(response.text)} @patch("airbyte_cdk.sources.streams.core.logging", MagicMock()) diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 4c432b69e650..703504598c61 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -2,12 +2,11 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - +import logging from collections import defaultdict from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union import pytest -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( AirbyteCatalog, AirbyteConnectionStatus, @@ -25,13 +24,15 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +logger = logging.getLogger("airbyte") + class MockSource(AbstractSource): def __init__(self, check_lambda: Callable[[], Tuple[bool, Optional[Any]]] = None, streams: List[Stream] = None): self._streams = streams self.check_lambda = check_lambda - def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: if self.check_lambda: return self.check_lambda() return (False, "Missing callable.") @@ -42,11 +43,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: return self._streams -@pytest.fixture -def logger() -> AirbyteLogger: - return AirbyteLogger() - - def test_successful_check(): """Tests that if a source returns TRUE for the connection check the appropriate connectionStatus success message is returned""" expected = AirbyteConnectionStatus(status=Status.SUCCEEDED) @@ -112,7 +108,7 @@ def test_discover(mocker): assert expected == src.discover(logger, {}) -def test_read_nonexistent_stream_raises_exception(mocker, logger): +def test_read_nonexistent_stream_raises_exception(mocker): """Tests that attempting to sync a stream which the source does not return from the `streams` method raises an exception""" s1 = MockStream(name="s1") s2 = MockStream(name="this_stream_doesnt_exist_in_the_source") @@ -149,7 +145,7 @@ def _fix_emitted_at(messages: List[AirbyteMessage]) -> List[AirbyteMessage]: return messages -def test_valid_full_refresh_read_no_slices(logger, mocker): +def test_valid_full_refresh_read_no_slices(mocker): """Tests that running a full refresh sync on streams which don't specify slices produces the expected AirbyteMessages""" stream_output = [{"k1": "v1"}, {"k2": "v2"}] s1 = MockStream([({"sync_mode": SyncMode.full_refresh}, stream_output)], name="s1") @@ -168,7 +164,7 @@ def test_valid_full_refresh_read_no_slices(logger, mocker): assert expected == messages -def test_valid_full_refresh_read_with_slices(mocker, logger): +def test_valid_full_refresh_read_with_slices(mocker): """Tests that running a full refresh sync on streams which use slices produces the expected AirbyteMessages""" slices = [{"1": "1"}, {"2": "2"}] # When attempting to sync a slice, just output that slice as a record @@ -194,7 +190,7 @@ def _state(state_data: Dict[str, Any]): return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state_data)) -def test_valid_incremental_read_with_checkpoint_interval(mocker, logger): +def test_valid_incremental_read_with_checkpoint_interval(mocker): """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message after reading N records within a stream""" stream_output = [{"k1": "v1"}, {"k2": "v2"}] s1 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s1") @@ -226,7 +222,7 @@ def test_valid_incremental_read_with_checkpoint_interval(mocker, logger): assert expected == messages -def test_valid_incremental_read_with_no_interval(mocker, logger): +def test_valid_incremental_read_with_no_interval(mocker): """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message only after fully reading the stream and does not output any STATE messages during syncing the stream.""" stream_output = [{"k1": "v1"}, {"k2": "v2"}] @@ -252,7 +248,7 @@ def test_valid_incremental_read_with_no_interval(mocker, logger): assert expected == messages -def test_valid_incremental_read_with_slices(mocker, logger): +def test_valid_incremental_read_with_slices(mocker): """Tests that an incremental read which uses slices outputs each record in the slice followed by a STATE message, for each slice""" slices = [{"1": "1"}, {"2": "2"}] stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] @@ -291,7 +287,7 @@ def test_valid_incremental_read_with_slices(mocker, logger): assert expected == messages -def test_valid_incremental_read_with_slices_and_interval(mocker, logger): +def test_valid_incremental_read_with_slices_and_interval(mocker): """ Tests that an incremental read which uses slices and a checkpoint interval: 1. outputs all records diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 3812cc6b57be..141168494c23 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -4,12 +4,12 @@ import json +import logging import tempfile from typing import Any, Mapping, MutableMapping from unittest.mock import MagicMock import pytest -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConfiguredAirbyteCatalog, SyncMode, Type from airbyte_cdk.sources import AbstractSource, Source from airbyte_cdk.sources.streams.core import Stream @@ -19,14 +19,14 @@ class MockSource(Source): def read( - self, logger: AirbyteLogger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None ): pass - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]): + def check(self, logger: logging.Logger, config: Mapping[str, Any]): pass - def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]): + def discover(self, logger: logging.Logger, config: Mapping[str, Any]): pass diff --git a/airbyte-cdk/python/unit_tests/test_connector.py b/airbyte-cdk/python/unit_tests/test_connector.py index d1ac03788030..463b58290cba 100644 --- a/airbyte-cdk/python/unit_tests/test_connector.py +++ b/airbyte-cdk/python/unit_tests/test_connector.py @@ -4,13 +4,13 @@ import json +import logging import tempfile from pathlib import Path from typing import Any, Mapping import pytest from airbyte_cdk import AirbyteSpec, Connector -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus @@ -39,7 +39,7 @@ def test_from_file_nonexistent(self): class MockConnector(Connector): - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: pass @@ -68,6 +68,6 @@ def test_read_config(nonempty_file, integration: Connector, mock_config): def test_write_config(integration, mock_config): config_path = Path(tempfile.gettempdir()) / "config.json" - integration.write_config(mock_config, config_path) + integration.write_config(mock_config, str(config_path)) with open(config_path, "r") as actual: assert mock_config == json.loads(actual.read()) diff --git a/airbyte-cdk/python/unit_tests/test_logger.py b/airbyte-cdk/python/unit_tests/test_logger.py index 9a4b2b9469ec..96b5be205208 100644 --- a/airbyte-cdk/python/unit_tests/test_logger.py +++ b/airbyte-cdk/python/unit_tests/test_logger.py @@ -54,7 +54,7 @@ def test_level_transform(logger, caplog): def test_trace(logger, caplog): - logger.trace("Test trace 1") + logger.log(logging.getLevelName("TRACE"), "Test trace 1") record = caplog.records[0] assert record.levelname == "TRACE" assert record.message == "Test trace 1" diff --git a/airbyte-cdk/python/unit_tests/test_secure_logger.py b/airbyte-cdk/python/unit_tests/test_secure_logger.py index 5bd5d79ccfde..ed1e86cfc4db 100644 --- a/airbyte-cdk/python/unit_tests/test_secure_logger.py +++ b/airbyte-cdk/python/unit_tests/test_secure_logger.py @@ -8,7 +8,7 @@ from typing import Any, Iterable, Mapping, MutableMapping import pytest -from airbyte_cdk import AirbyteEntrypoint, AirbyteLogger +from airbyte_cdk import AirbyteEntrypoint from airbyte_cdk.logger import AirbyteLogFormatter from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type from airbyte_cdk.sources import Source @@ -29,7 +29,7 @@ class MockSource(Source): def read( self, - logger: AirbyteLogger, + logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None, @@ -152,7 +152,7 @@ def test_airbyte_secrets_are_masked_on_uncaught_exceptions(mocker, caplog): class BrokenSource(MockSource): def read( self, - logger: AirbyteLogger, + logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None, @@ -198,7 +198,7 @@ def test_non_airbyte_secrets_are_not_masked_on_uncaught_exceptions(mocker, caplo class BrokenSource(MockSource): def read( self, - logger: AirbyteLogger, + logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None,