Skip to content

Commit

Permalink
Feat: Add performance stats and logging (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Sep 12, 2024
1 parent ddbf6f4 commit c5fea25
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 157 deletions.
33 changes: 32 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
)

from airbyte import exceptions as exc
from airbyte._util.connector_info import ConnectorRuntimeInfo
from airbyte._util.hashing import one_way_hash
from airbyte._util.telemetry import (
EventState,
log_config_validation_result,
Expand Down Expand Up @@ -76,6 +78,15 @@ def name(self) -> str:
"""Get the name of the connector."""
return self._name

def _get_connector_runtime_info(self) -> ConnectorRuntimeInfo:
"""Get metadata for telemetry and performance logging."""
return ConnectorRuntimeInfo(
name=self.name,
version=self.connector_version,
executor_type=type(self.executor).__name__,
config_hash=self.config_hash,
)

def _print_info_message(
self,
message: str,
Expand Down Expand Up @@ -124,6 +135,22 @@ def _config(self) -> dict[str, Any]:
)
return self._config_dict

@property
def config_hash(self) -> str | None:
"""Get a hash of the current config.
Returns None if the config is not set.
"""
if self._config_dict is None:
return None

try:
return one_way_hash(self._config_dict)
except Exception:
# This can fail if there are unhashable values in the config,
# or unexpected data types. In this case, return None.
return None

def validate_config(self, config: dict[str, Any] | None = None) -> None:
"""Validate the config against the spec.
Expand Down Expand Up @@ -262,7 +289,11 @@ def connector_version(self) -> str | None:
Returns None if the version cannot be determined.
"""
return self.executor.get_installed_version()
try:
return self.executor.get_installed_version()
except Exception:
# Version not detected, so return None.
return None

def check(self) -> None:
"""Call check on the connector.
Expand Down
10 changes: 10 additions & 0 deletions airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ def __init__(

self.reported_version: str | None = self._manifest_dict.get("version", None)

def get_installed_version(
self,
*,
raise_on_error: bool = False,
recheck: bool = False,
) -> str | None:
"""Detect the version of the connector installed."""
_ = raise_on_error, recheck # Not used
return self.reported_version

def _validate_manifest(self, manifest_dict: dict) -> None:
"""Validate the manifest."""
manifest_text = yaml.safe_dump(manifest_dict)
Expand Down
25 changes: 24 additions & 1 deletion airbyte/_message_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@
from collections.abc import Iterator
from typing import IO, TYPE_CHECKING, cast

import pendulum
import pydantic
from typing_extensions import final

from airbyte_protocol.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStreamStatus,
AirbyteStreamStatusTraceMessage,
AirbyteTraceMessage,
StreamDescriptor,
TraceType,
Type,
)

from airbyte.constants import AB_EXTRACTED_AT_COLUMN
from airbyte.progress import _new_stream_success_message


if TYPE_CHECKING:
Expand All @@ -28,6 +33,24 @@
from airbyte.results import ReadResult


def _new_stream_success_message(stream_name: str) -> AirbyteMessage:
"""Return a new stream success message."""
return AirbyteMessage(
type=Type.TRACE,
trace=AirbyteTraceMessage(
type=TraceType.STREAM_STATUS,
stream=stream_name,
emitted_at=pendulum.now().float_timestamp,
stream_status=AirbyteStreamStatusTraceMessage(
stream_descriptor=StreamDescriptor(
name=stream_name,
),
status=AirbyteStreamStatus.COMPLETE,
),
),
)


class AirbyteMessageIterator:
"""Abstract base class for Airbyte message iterables.
Expand Down
30 changes: 30 additions & 0 deletions airbyte/_util/connector_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Connector info classes for PyAirbyte.
Used for telemetry and logging.
"""

from __future__ import annotations

from dataclasses import asdict, dataclass
from typing import Any


@dataclass
class RuntimeInfoBase:
def to_dict(self) -> dict[str, Any]:
return {k: v for k, v in asdict(self).items() if v is not None}


@dataclass
class WriterRuntimeInfo(RuntimeInfoBase):
type: str
config_hash: str | None = None


@dataclass(kw_only=True)
class ConnectorRuntimeInfo(RuntimeInfoBase):
name: str
executor_type: str | None = None
version: str | None = None
config_hash: str | None = None
35 changes: 35 additions & 0 deletions airbyte/_util/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""Hashing utils for PyAirbyte."""

from __future__ import annotations

import hashlib
from collections.abc import Mapping


HASH_SEED = "PyAirbyte:"
"""Additional seed for randomizing one-way hashed strings."""


def one_way_hash(
obj: Mapping | list | object,
/,
) -> str:
"""Return a one-way hash of the given string.
To ensure a unique domain of hashes, we prepend a seed to the string before hashing.
"""
string_to_hash: str
if isinstance(obj, Mapping):
# Recursively sort and convert nested dictionaries to tuples of key-value pairs
string_to_hash = str(sorted((k, one_way_hash(v)) for k, v in obj.items()))

elif isinstance(obj, list):
# Recursively hash elements of the list
string_to_hash = str([one_way_hash(item) for item in obj])

else:
# Convert the object to a string
string_to_hash = str(obj)

return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest()
123 changes: 21 additions & 102 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,39 +32,31 @@
from __future__ import annotations

import datetime
import hashlib
import os
from contextlib import suppress
from dataclasses import asdict, dataclass
from enum import Enum
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import Any, cast

import requests
import ulid
import yaml

from airbyte import exceptions as exc
from airbyte._util import meta
from airbyte._util.connector_info import (
ConnectorRuntimeInfo,
WriterRuntimeInfo,
)
from airbyte._util.hashing import one_way_hash
from airbyte.version import get_version


if TYPE_CHECKING:
from airbyte._writers.base import AirbyteWriterInterface
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source


DEBUG = True
"""Enable debug mode for telemetry code."""


HASH_SEED = "PyAirbyte:"
"""Additional seed for randomizing one-way hashed strings."""


PYAIRBYTE_APP_TRACKING_KEY = (
os.environ.get("AIRBYTE_TRACKING_KEY", "") or "cukeSffc0G6gFQehKDhhzSurDzVSZ2OP"
)
Expand Down Expand Up @@ -185,83 +177,6 @@ class EventType(str, Enum):
CHECK = "check"


@dataclass
class CacheTelemetryInfo:
type: str

@classmethod
def from_cache(cls, cache: CacheBase | None) -> CacheTelemetryInfo:
if not cache:
return cls(type="streaming")

return cls(type=type(cache).__name__)


@dataclass
class SourceTelemetryInfo:
name: str
executor_type: str
version: str | None

@classmethod
def from_source(cls, source: Source | str) -> SourceTelemetryInfo:
if isinstance(source, str):
return cls(
name=str(source),
executor_type=UNKNOWN,
version=UNKNOWN,
)

# Else, `source` should be a `Source` object at this point
return cls(
name=source.name,
executor_type=type(source.executor).__name__,
version=source.executor.reported_version,
)


@dataclass
class DestinationTelemetryInfo:
name: str
executor_type: str
version: str | None

@classmethod
def from_destination(
cls,
destination: Destination | AirbyteWriterInterface | str | None,
) -> DestinationTelemetryInfo:
if not destination:
return cls(name=UNKNOWN, executor_type=UNKNOWN, version=UNKNOWN)

if isinstance(destination, str):
return cls(name=destination, executor_type=UNKNOWN, version=UNKNOWN)

if hasattr(destination, "executor"):
return cls(
name=destination.name,
executor_type=type(destination.executor).__name__,
version=destination.executor.reported_version,
)

return cls(
name=repr(destination),
executor_type=UNKNOWN,
version=UNKNOWN,
)


def one_way_hash(
string_to_hash: Any, # noqa: ANN401 # Allow Any type
/,
) -> str:
"""Return a one-way hash of the given string.
To ensure a unique domain of hashes, we prepend a seed to the string before hashing.
"""
return hashlib.sha256((HASH_SEED + str(string_to_hash)).encode()).hexdigest()


@lru_cache
def get_env_flags() -> dict[str, Any]:
flags: dict[str, bool | str] = {
Expand All @@ -283,9 +198,9 @@ def get_env_flags() -> dict[str, Any]:

def send_telemetry(
*,
source: Source | str | None,
destination: Destination | AirbyteWriterInterface | str | None,
cache: CacheBase | None,
source: ConnectorRuntimeInfo | None,
destination: ConnectorRuntimeInfo | None,
cache: WriterRuntimeInfo | None,
state: EventState,
event_type: EventType,
number_of_records: int | None = None,
Expand All @@ -297,8 +212,6 @@ def send_telemetry(

payload_props: dict[str, str | int | dict] = {
"session_id": PYAIRBYTE_SESSION_ID,
"cache": asdict(CacheTelemetryInfo.from_cache(cache)),
"destination": asdict(DestinationTelemetryInfo.from_destination(destination)),
"state": state,
"version": get_version(),
"python_version": meta.get_python_version(),
Expand All @@ -308,7 +221,13 @@ def send_telemetry(
}

if source:
payload_props["source"] = asdict(SourceTelemetryInfo.from_source(source))
payload_props["source"] = source.to_dict()

if destination:
payload_props["destination"] = destination.to_dict()

if cache:
payload_props["cache"] = cache.to_dict()

if exception:
if isinstance(exception, exc.AirbyteError):
Expand Down Expand Up @@ -345,8 +264,8 @@ def log_config_validation_result(
treated as a source name.
"""
send_telemetry(
source=name if not name.startswith("destination-") else None,
destination=name if name.startswith("destination-") else None,
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.VALIDATE,
Expand All @@ -365,8 +284,8 @@ def log_connector_check_result(
treated as a source name.
"""
send_telemetry(
source=name if not name.startswith("destination-") else None,
destination=name if name.startswith("destination-") else None,
source=ConnectorRuntimeInfo(name=name) if not name.startswith("destination-") else None,
destination=ConnectorRuntimeInfo(name=name) if name.startswith("destination-") else None,
cache=None,
state=state,
event_type=EventType.CHECK,
Expand All @@ -381,7 +300,7 @@ def log_install_state(
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
source=ConnectorRuntimeInfo(name=name),
destination=None,
cache=None,
state=state,
Expand Down
Loading

0 comments on commit c5fea25

Please sign in to comment.