diff --git a/airbyte/_connector_base.py b/airbyte/_connector_base.py index 6247a3e2..2007cbfa 100644 --- a/airbyte/_connector_base.py +++ b/airbyte/_connector_base.py @@ -22,6 +22,8 @@ ) from airbyte import exceptions as exc +from airbyte._util.connector_info import ConnectorRuntimeInfo +from airbyte._util.hashing import one_way_hash from airbyte._util.telemetry import ( EventState, log_config_validation_result, @@ -76,6 +78,15 @@ def name(self) -> str: """Get the name of the connector.""" return self._name + def _get_connector_runtime_info(self) -> ConnectorRuntimeInfo: + """Get metadata for telemetry and performance logging.""" + return ConnectorRuntimeInfo( + name=self.name, + version=self.connector_version, + executor_type=type(self.executor).__name__, + config_hash=self.config_hash, + ) + def _print_info_message( self, message: str, @@ -124,6 +135,22 @@ def _config(self) -> dict[str, Any]: ) return self._config_dict + @property + def config_hash(self) -> str | None: + """Get a hash of the current config. + + Returns None if the config is not set. + """ + if self._config_dict is None: + return None + + try: + return one_way_hash(self._config_dict) + except Exception: + # This can fail if there are unhashable values in the config, + # or unexpected data types. In this case, return None. + return None + def validate_config(self, config: dict[str, Any] | None = None) -> None: """Validate the config against the spec. @@ -262,7 +289,11 @@ def connector_version(self) -> str | None: Returns None if the version cannot be determined. """ - return self.executor.get_installed_version() + try: + return self.executor.get_installed_version() + except Exception: + # Version not detected, so return None. + return None def check(self) -> None: """Call check on the connector. diff --git a/airbyte/_executors/declarative.py b/airbyte/_executors/declarative.py index 554f520c..dd331537 100644 --- a/airbyte/_executors/declarative.py +++ b/airbyte/_executors/declarative.py @@ -64,6 +64,16 @@ def __init__( self.reported_version: str | None = self._manifest_dict.get("version", None) + def get_installed_version( + self, + *, + raise_on_error: bool = False, + recheck: bool = False, + ) -> str | None: + """Detect the version of the connector installed.""" + _ = raise_on_error, recheck # Not used + return self.reported_version + def _validate_manifest(self, manifest_dict: dict) -> None: """Validate the manifest.""" manifest_text = yaml.safe_dump(manifest_dict) diff --git a/airbyte/_message_iterators.py b/airbyte/_message_iterators.py index d50dad7a..61edf7e3 100644 --- a/airbyte/_message_iterators.py +++ b/airbyte/_message_iterators.py @@ -8,17 +8,22 @@ from collections.abc import Iterator from typing import IO, TYPE_CHECKING, cast +import pendulum import pydantic from typing_extensions import final from airbyte_protocol.models import ( AirbyteMessage, AirbyteRecordMessage, + AirbyteStreamStatus, + AirbyteStreamStatusTraceMessage, + AirbyteTraceMessage, + StreamDescriptor, + TraceType, Type, ) from airbyte.constants import AB_EXTRACTED_AT_COLUMN -from airbyte.progress import _new_stream_success_message if TYPE_CHECKING: @@ -28,6 +33,24 @@ from airbyte.results import ReadResult +def _new_stream_success_message(stream_name: str) -> AirbyteMessage: + """Return a new stream success message.""" + return AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.STREAM_STATUS, + stream=stream_name, + emitted_at=pendulum.now().float_timestamp, + stream_status=AirbyteStreamStatusTraceMessage( + stream_descriptor=StreamDescriptor( + name=stream_name, + ), + status=AirbyteStreamStatus.COMPLETE, + ), + ), + ) + + class AirbyteMessageIterator: """Abstract base class for Airbyte message iterables. diff --git a/airbyte/_util/connector_info.py b/airbyte/_util/connector_info.py new file mode 100644 index 00000000..68541db5 --- /dev/null +++ b/airbyte/_util/connector_info.py @@ -0,0 +1,30 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""Connector info classes for PyAirbyte. + +Used for telemetry and logging. +""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from typing import Any + + +@dataclass +class RuntimeInfoBase: + def to_dict(self) -> dict[str, Any]: + return {k: v for k, v in asdict(self).items() if v is not None} + + +@dataclass +class WriterRuntimeInfo(RuntimeInfoBase): + type: str + config_hash: str | None = None + + +@dataclass(kw_only=True) +class ConnectorRuntimeInfo(RuntimeInfoBase): + name: str + executor_type: str | None = None + version: str | None = None + config_hash: str | None = None diff --git a/airbyte/_util/hashing.py b/airbyte/_util/hashing.py new file mode 100644 index 00000000..264abf87 --- /dev/null +++ b/airbyte/_util/hashing.py @@ -0,0 +1,35 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""Hashing utils for PyAirbyte.""" + +from __future__ import annotations + +import hashlib +from collections.abc import Mapping + + +HASH_SEED = "PyAirbyte:" +"""Additional seed for randomizing one-way hashed strings.""" + + +def one_way_hash( + obj: Mapping | list | object, + /, +) -> str: + """Return a one-way hash of the given string. + + To ensure a unique domain of hashes, we prepend a seed to the string before hashing. + """ + string_to_hash: str + if isinstance(obj, Mapping): + # Recursively sort and convert nested dictionaries to tuples of key-value pairs + string_to_hash = str(sorted((k, one_way_hash(v)) for k, v in obj.items())) + + elif isinstance(obj, list): + # Recursively hash elements of the list + string_to_hash = str([one_way_hash(item) for item in obj]) + + else: + # Convert the object to a string + string_to_hash = str(obj) + + return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest() diff --git a/airbyte/_util/telemetry.py b/airbyte/_util/telemetry.py index 73376d40..0e4a5a83 100644 --- a/airbyte/_util/telemetry.py +++ b/airbyte/_util/telemetry.py @@ -32,14 +32,12 @@ from __future__ import annotations import datetime -import hashlib import os from contextlib import suppress -from dataclasses import asdict, dataclass from enum import Enum from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Any, cast +from typing import Any, cast import requests import ulid @@ -47,24 +45,18 @@ from airbyte import exceptions as exc from airbyte._util import meta +from airbyte._util.connector_info import ( + ConnectorRuntimeInfo, + WriterRuntimeInfo, +) +from airbyte._util.hashing import one_way_hash from airbyte.version import get_version -if TYPE_CHECKING: - from airbyte._writers.base import AirbyteWriterInterface - from airbyte.caches.base import CacheBase - from airbyte.destinations.base import Destination - from airbyte.sources.base import Source - - DEBUG = True """Enable debug mode for telemetry code.""" -HASH_SEED = "PyAirbyte:" -"""Additional seed for randomizing one-way hashed strings.""" - - PYAIRBYTE_APP_TRACKING_KEY = ( os.environ.get("AIRBYTE_TRACKING_KEY", "") or "cukeSffc0G6gFQehKDhhzSurDzVSZ2OP" ) @@ -185,83 +177,6 @@ class EventType(str, Enum): CHECK = "check" -@dataclass -class CacheTelemetryInfo: - type: str - - @classmethod - def from_cache(cls, cache: CacheBase | None) -> CacheTelemetryInfo: - if not cache: - return cls(type="streaming") - - return cls(type=type(cache).__name__) - - -@dataclass -class SourceTelemetryInfo: - name: str - executor_type: str - version: str | None - - @classmethod - def from_source(cls, source: Source | str) -> SourceTelemetryInfo: - if isinstance(source, str): - return cls( - name=str(source), - executor_type=UNKNOWN, - version=UNKNOWN, - ) - - # Else, `source` should be a `Source` object at this point - return cls( - name=source.name, - executor_type=type(source.executor).__name__, - version=source.executor.reported_version, - ) - - -@dataclass -class DestinationTelemetryInfo: - name: str - executor_type: str - version: str | None - - @classmethod - def from_destination( - cls, - destination: Destination | AirbyteWriterInterface | str | None, - ) -> DestinationTelemetryInfo: - if not destination: - return cls(name=UNKNOWN, executor_type=UNKNOWN, version=UNKNOWN) - - if isinstance(destination, str): - return cls(name=destination, executor_type=UNKNOWN, version=UNKNOWN) - - if hasattr(destination, "executor"): - return cls( - name=destination.name, - executor_type=type(destination.executor).__name__, - version=destination.executor.reported_version, - ) - - return cls( - name=repr(destination), - executor_type=UNKNOWN, - version=UNKNOWN, - ) - - -def one_way_hash( - string_to_hash: Any, # noqa: ANN401 # Allow Any type - /, -) -> str: - """Return a one-way hash of the given string. - - To ensure a unique domain of hashes, we prepend a seed to the string before hashing. - """ - return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest() - - @lru_cache def get_env_flags() -> dict[str, Any]: flags: dict[str, bool | str] = { @@ -283,9 +198,9 @@ def get_env_flags() -> dict[str, Any]: def send_telemetry( *, - source: Source | str | None, - destination: Destination | AirbyteWriterInterface | str | None, - cache: CacheBase | None, + source: ConnectorRuntimeInfo | None, + destination: ConnectorRuntimeInfo | None, + cache: WriterRuntimeInfo | None, state: EventState, event_type: EventType, number_of_records: int | None = None, @@ -297,8 +212,6 @@ def send_telemetry( payload_props: dict[str, str | int | dict] = { "session_id": PYAIRBYTE_SESSION_ID, - "cache": asdict(CacheTelemetryInfo.from_cache(cache)), - "destination": asdict(DestinationTelemetryInfo.from_destination(destination)), "state": state, "version": get_version(), "python_version": meta.get_python_version(), @@ -308,7 +221,13 @@ def send_telemetry( } if source: - payload_props["source"] = asdict(SourceTelemetryInfo.from_source(source)) + payload_props["source"] = source.to_dict() + + if destination: + payload_props["destination"] = destination.to_dict() + + if cache: + payload_props["cache"] = cache.to_dict() if exception: if isinstance(exception, exc.AirbyteError): @@ -345,8 +264,8 @@ def log_config_validation_result( treated as a source name. """ send_telemetry( - source=name if not name.startswith("destination-") else None, - destination=name if name.startswith("destination-") else None, + source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None, + destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None, cache=None, state=state, event_type=EventType.VALIDATE, @@ -365,8 +284,8 @@ def log_connector_check_result( treated as a source name. """ send_telemetry( - source=name if not name.startswith("destination-") else None, - destination=name if name.startswith("destination-") else None, + source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None, + destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None, cache=None, state=state, event_type=EventType.CHECK, @@ -381,7 +300,7 @@ def log_install_state( ) -> None: """Log an install event.""" send_telemetry( - source=name, + source=ConnectorRuntimeInfo(name=name), destination=None, cache=None, state=state, diff --git a/airbyte/_writers/base.py b/airbyte/_writers/base.py index baad82bf..d15c82c9 100644 --- a/airbyte/_writers/base.py +++ b/airbyte/_writers/base.py @@ -6,6 +6,8 @@ import abc from typing import IO, TYPE_CHECKING +from airbyte._util.connector_info import WriterRuntimeInfo + if TYPE_CHECKING: from airbyte._message_iterators import AirbyteMessageIterator @@ -29,6 +31,21 @@ def name(self) -> str: return self.__class__.__name__ + def _get_writer_runtime_info(self) -> WriterRuntimeInfo: + """Get metadata for telemetry and performance logging.""" + return WriterRuntimeInfo( + type=type(self).__name__, + config_hash=self.config_hash, + ) + + @property + def config_hash(self) -> str | None: + """Return a hash of the writer configuration. + + This is used for logging and state tracking. + """ + return None + def _write_airbyte_io_stream( self, stdin: IO[str], diff --git a/airbyte/caches/base.py b/airbyte/caches/base.py index dfe82b73..cc168c71 100644 --- a/airbyte/caches/base.py +++ b/airbyte/caches/base.py @@ -101,6 +101,14 @@ def __init__(self, **data: Any) -> None: # noqa: ANN401 temp_file_cleanup=self.cleanup, ) + @property + def config_hash(self) -> str | None: + """Return a hash of the cache configuration. + + This is the same as the SQLConfig hash from the superclass. + """ + return super(SqlConfig, self).config_hash + @final @property def processor(self) -> SqlProcessorBase: diff --git a/airbyte/logs.py b/airbyte/logs.py index 572861a0..5cb9d971 100644 --- a/airbyte/logs.py +++ b/airbyte/logs.py @@ -13,6 +13,7 @@ import logging import os +import platform import tempfile import warnings from functools import lru_cache @@ -82,7 +83,11 @@ def _get_logging_root() -> Path | None: """ if "AIRBYTE_LOGGING_ROOT" in os.environ: log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) + elif platform.system() == "Darwin" or platform.system() == "Linux": + # Use /tmp on macOS and Linux + log_root = Path("/tmp") / "airbyte" / "logs" else: + # Use the default temp directory on Windows or any other OS log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" try: @@ -192,6 +197,81 @@ def get_global_file_logger() -> logging.Logger | None: return logger +def get_global_stats_log_path() -> Path | None: + """Return the path to the performance log file.""" + if AIRBYTE_LOGGING_ROOT is None: + return None + + folder = AIRBYTE_LOGGING_ROOT + try: + folder.mkdir(parents=True, exist_ok=True) + except Exception: + warn_once( + f"Failed to create logging directory at '{folder!s}'.", + with_stack=False, + ) + return None + + return folder / "airbyte-stats.log" + + +@lru_cache +def get_global_stats_logger() -> structlog.BoundLogger: + """Create a stats logger for performance metrics.""" + logger = logging.getLogger("airbyte.stats") + logger.setLevel(logging.INFO) + logger.propagate = False + + # Configure structlog + structlog.configure( + processors=[ + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.JSONRenderer(), + ], + context_class=dict, + logger_factory=structlog.stdlib.LoggerFactory(), + wrapper_class=structlog.stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + logfile_path: Path | None = get_global_stats_log_path() + if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: + # No temp directory available, so return no-op logger without handlers + return structlog.get_logger("airbyte.stats") + + print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}") + + # Remove any existing handlers + for handler in logger.handlers: + logger.removeHandler(handler) + + folder = AIRBYTE_LOGGING_ROOT + try: + folder.mkdir(parents=True, exist_ok=True) + except Exception: + warn_once( + f"Failed to create logging directory at '{folder!s}'.", + with_stack=False, + ) + return structlog.get_logger("airbyte.stats") + + file_handler = logging.FileHandler( + filename=logfile_path, + encoding="utf-8", + ) + + # Create a formatter and set it for the handler + formatter = logging.Formatter("%(message)s") + file_handler.setFormatter(formatter) + + # Add the file handler to the logger + logger.addHandler(file_handler) + + # Create a logger + return structlog.get_logger("airbyte.stats") + + def new_passthrough_file_logger(connector_name: str) -> logging.Logger: """Create a logger from logging module.""" logger = logging.getLogger(f"airbyte.{connector_name}") diff --git a/airbyte/progress.py b/airbyte/progress.py index def54028..af4204a3 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -36,16 +36,17 @@ from airbyte_protocol.models import ( AirbyteMessage, AirbyteStreamStatus, - AirbyteStreamStatusTraceMessage, - AirbyteTraceMessage, - StreamDescriptor, - TraceType, Type, ) from airbyte import logs +from airbyte._message_iterators import _new_stream_success_message from airbyte._util import meta -from airbyte._util.telemetry import EventState, EventType, send_telemetry +from airbyte._util.telemetry import ( + EventState, + EventType, + send_telemetry, +) from airbyte.logs import get_global_file_logger @@ -54,8 +55,9 @@ from collections.abc import Generator, Iterable from types import ModuleType + from structlog import BoundLogger + from airbyte._message_iterators import AirbyteMessageIterator - from airbyte._writers.base import AirbyteWriterInterface from airbyte.caches.base import CacheBase from airbyte.destinations.base import Destination from airbyte.sources.base import Source @@ -81,24 +83,6 @@ IS_NOTEBOOK = False -def _new_stream_success_message(stream_name: str) -> AirbyteMessage: - """Return a new stream success message.""" - return AirbyteMessage( - type=Type.TRACE, - trace=AirbyteTraceMessage( - type=TraceType.STREAM_STATUS, - stream=stream_name, - emitted_at=pendulum.now().float_timestamp, - stream_status=AirbyteStreamStatusTraceMessage( - stream_descriptor=StreamDescriptor( - name=stream_name, - ), - status=AirbyteStreamStatus.COMPLETE, - ), - ), - ) - - class ProgressStyle(Enum): """An enum of progress bar styles.""" @@ -179,7 +163,7 @@ def __init__( *, source: Source | None, cache: CacheBase | None, - destination: AirbyteWriterInterface | Destination | None, + destination: Destination | None, expected_streams: list[str] | None = None, ) -> None: """Initialize the progress tracker.""" @@ -398,15 +382,38 @@ def job_description(self) -> str: return " -> ".join(steps) + def _send_telemetry( + self, + state: EventState, + number_of_records: int | None = None, + event_type: EventType = EventType.SYNC, + exception: Exception | None = None, + ) -> None: + """Send telemetry for the current job state. + + A thin wrapper around `send_telemetry` that includes the job description. + """ + send_telemetry( + source=self._source._get_connector_runtime_info() if self._source else None, # noqa: SLF001 + cache=self._cache._get_writer_runtime_info() if self._cache else None, # noqa: SLF001 + destination=( + self._destination._get_connector_runtime_info() # noqa: SLF001 + if self._destination + else None + ), + state=state, + number_of_records=number_of_records, + event_type=event_type, + exception=exception, + ) + def _log_sync_start(self) -> None: """Log the start of a sync operation.""" self._print_info_message( f"Started `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`..." ) - send_telemetry( - source=self._source, - cache=self._cache, - destination=self._destination, + # We access a non-public API here (noqa: SLF001) to get the runtime info for participants. + self._send_telemetry( state=EventState.STARTED, event_type=EventType.SYNC, ) @@ -423,24 +430,35 @@ def _log_stream_read_end(self, stream_name: str) -> None: ) self.stream_read_end_times[stream_name] = time.time() + @property + def _job_info(self) -> dict[str, Any]: + """Return a dictionary of job information.""" + job_info: dict[str, str | dict] = { + "description": self.job_description, + } + if self._source: + job_info["source"] = self._source._get_connector_runtime_info().to_dict() # noqa: SLF001 + + if self._cache: + job_info["cache"] = self._cache._get_writer_runtime_info().to_dict() # noqa: SLF001 + + if self._destination: + job_info["destination"] = self._destination._get_connector_runtime_info().to_dict() # noqa: SLF001 + + return job_info + def _log_read_metrics(self) -> None: """Log read performance metrics.""" # Source performance metrics if not self.total_records_read or not self._file_logger: return - perf_metrics: dict[str, Any] = { - "job_description": { - "description": self.job_description, - } + log_dict = { + "job_type": "read", + "job_info": self._job_info, } - if self._source: - perf_metrics["job_description"]["source"] = self._source.name - if self._cache: - perf_metrics["job_description"]["cache"] = type(self._cache).__name__ - if self._destination: - perf_metrics["job_description"]["destination"] = self._destination.name + perf_metrics: dict[str, Any] = {} perf_metrics["records_read"] = self.total_records_read perf_metrics["read_time_seconds"] = self.elapsed_read_seconds perf_metrics["read_start_time"] = self.read_start_time @@ -486,8 +504,12 @@ def _log_read_metrics(self) -> None: stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4) perf_metrics["stream_metrics"] = stream_metrics + log_dict["performance_metrics"] = perf_metrics - self._file_logger.info(json.dumps({"read_performance_metrics": perf_metrics})) + self._file_logger.info(json.dumps(log_dict)) + + perf_logger: BoundLogger = logs.get_global_stats_logger() + perf_logger.info(**log_dict) @property def _unclosed_stream_names(self) -> list[str]: @@ -513,10 +535,7 @@ def log_success( f"Completed `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`." ) self._log_read_metrics() - send_telemetry( - source=self._source, - cache=self._cache, - destination=self._destination, + self._send_telemetry( state=EventState.SUCCEEDED, number_of_records=self.total_records_read, event_type=EventType.SYNC, @@ -532,11 +551,8 @@ def log_failure( self._print_info_message( f"Failed `{self.job_description}` sync at `{pendulum.now().format('HH:mm:ss')}`." ) - send_telemetry( + self._send_telemetry( state=EventState.FAILED, - source=self._source, - cache=self._cache, - destination=self._destination, number_of_records=self.total_records_read, exception=exception, event_type=EventType.SYNC, diff --git a/airbyte/shared/sql_processor.py b/airbyte/shared/sql_processor.py index 39cd9b51..0bfdfe1f 100644 --- a/airbyte/shared/sql_processor.py +++ b/airbyte/shared/sql_processor.py @@ -41,6 +41,7 @@ ) from airbyte import exceptions as exc +from airbyte._util.hashing import one_way_hash from airbyte._util.name_normalizers import LowerCaseNormalizer from airbyte.constants import ( AB_EXTRACTED_AT_COLUMN, @@ -49,6 +50,7 @@ DEBUG_MODE, ) from airbyte.records import StreamRecordHandler +from airbyte.secrets.base import SecretString from airbyte.shared.state_writers import StdOutStateWriter from airbyte.strategies import WriteMethod, WriteStrategy from airbyte.types import SQLTypeConverter @@ -66,7 +68,6 @@ from airbyte._batch_handles import BatchHandle from airbyte._writers.jsonl import FileWriterBase from airbyte.progress import ProgressTracker - from airbyte.secrets.base import SecretString from airbyte.shared.catalog_providers import CatalogProvider from airbyte.shared.state_writers import StateWriterBase @@ -101,6 +102,28 @@ def get_database_name(self) -> str: """Return the name of the database.""" ... + @property + def config_hash(self) -> str | None: + """Return a unique one-way hash of the configuration. + + The generic implementation uses the SQL Alchemy URL, schema name, and table prefix. Some + inputs may be redundant with the SQL Alchemy URL, but this does not hurt the hash + uniqueness. + + In most cases, subclasses do not need to override this method. + """ + return one_way_hash( + SecretString( + ":".join( + [ + str(self.get_sql_alchemy_url()), + self.schema_name or "", + self.table_prefix or "", + ] + ) + ) + ) + def get_sql_engine(self) -> Engine: """Return a new SQL engine to use.""" return create_engine( diff --git a/tests/unit_tests/test_anonymous_usage_stats.py b/tests/unit_tests/test_anonymous_usage_stats.py index fa8d960f..e0bf19ee 100644 --- a/tests/unit_tests/test_anonymous_usage_stats.py +++ b/tests/unit_tests/test_anonymous_usage_stats.py @@ -29,9 +29,9 @@ def test_telemetry_track(monkeypatch, source_test_registry): responses.add(responses.POST, "https://api.segment.io/v1/track", status=200) telemetry.send_telemetry( - source=source_test, + source=source_test._get_connector_runtime_info(), destination=None, - cache=cache, + cache=cache._get_writer_runtime_info(), state=telemetry.EventState.STARTED, number_of_records=0, event_type=telemetry.EventType.SYNC, @@ -89,9 +89,9 @@ def test_do_not_track( responses.add(responses.GET, re.compile(".*"), status=200) telemetry.send_telemetry( - source=source_test, + source=source_test._get_connector_runtime_info(), destination=None, - cache=cache, + cache=cache._get_writer_runtime_info(), state=telemetry.EventState.STARTED, number_of_records=0, event_type=telemetry.EventType.SYNC,