Skip to content

Commit

Permalink
Chore: Add logging for config validation, source check, and successfu…
Browse files Browse the repository at this point in the history
…l installs (#135)
  • Loading branch information
aaronsteers authored Mar 19, 2024
1 parent e606157 commit 2d90641
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 21 deletions.
2 changes: 2 additions & 0 deletions airbyte/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from airbyte import exceptions as exc
from airbyte._util.meta import is_windows
from airbyte._util.telemetry import EventState, log_install_state
from airbyte.sources.registry import ConnectorMetadata


Expand Down Expand Up @@ -238,6 +239,7 @@ def install(self) -> None:

# Assuming the installation succeeded, store the installed version
self.reported_version = self._get_installed_version(raise_on_error=False, recheck=True)
log_install_state(self.name, state=EventState.SUCCEEDED)
print(
f"Connector '{self.name}' installed successfully!\n"
f"For more information, see the {self.name} documentation:\n"
Expand Down
47 changes: 47 additions & 0 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class EventState(str, Enum):
class EventType(str, Enum):
INSTALL = "install"
SYNC = "sync"
VALIDATE = "validate"
CHECK = "check"


@dataclass
Expand Down Expand Up @@ -293,3 +295,48 @@ def send_telemetry(
"timestamp": datetime.datetime.utcnow().isoformat(), # noqa: DTZ003
},
)


def log_config_validation_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a config validation event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.VALIDATE,
exception=exception,
)


def log_source_check_result(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log a source `check` result."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.CHECK,
exception=exception,
)


def log_install_state(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.INSTALL,
exception=exception,
)
30 changes: 27 additions & 3 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@
from airbyte import exceptions as exc
from airbyte._util import protocol_util
from airbyte._util.name_normalizers import normalize_records
from airbyte._util.telemetry import EventState, EventType, send_telemetry
from airbyte._util.telemetry import (
EventState,
EventType,
log_config_validation_result,
log_source_check_result,
send_telemetry,
)
from airbyte.caches.util import get_default_cache
from airbyte.datasets._lazy import LazyDataset
from airbyte.progress import progress
Expand Down Expand Up @@ -200,16 +206,26 @@ def validate_config(self, config: dict[str, Any] | None = None) -> None:
config = self._config if config is None else config
try:
jsonschema.validate(config, spec.connectionSpecification)
log_config_validation_result(
name=self.name,
state=EventState.SUCCEEDED,
)
except jsonschema.ValidationError as ex:
raise exc.AirbyteConnectorValidationFailedError(
validation_ex = exc.AirbyteConnectorValidationFailedError(
message="The provided config is not valid.",
context={
"error_message": ex.message,
"error_path": ex.path,
"error_instance": ex.instance,
"error_schema": ex.schema,
},
) from ex
)
log_config_validation_result(
name=self.name,
state=EventState.FAILED,
exception=validation_ex,
)
raise validation_ex from ex

def get_available_streams(self) -> list[str]:
"""Get the available streams from the spec."""
Expand Down Expand Up @@ -398,8 +414,16 @@ def check(self) -> None:
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:
if msg.connectionStatus.status != Status.FAILED:
print(f"Connection check succeeded for `{self.name}`.")
log_source_check_result(
name=self.name,
state=EventState.SUCCEEDED,
)
return

log_source_check_result(
name=self.name,
state=EventState.FAILED,
)
raise exc.AirbyteConnectorCheckFailedError(
help_url=self.docs_url,
context={
Expand Down
21 changes: 3 additions & 18 deletions airbyte/sources/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from airbyte import exceptions as exc
from airbyte._executor import PathExecutor, VenvExecutor
from airbyte._util.telemetry import EventState, EventType, send_telemetry
from airbyte._util.telemetry import EventState, log_install_state
from airbyte.sources.base import Source
from airbyte.sources.registry import ConnectorMetadata, get_connector_metadata

Expand Down Expand Up @@ -122,7 +122,7 @@ def get_source(
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError as ex:
if not pip_url:
_log_install_state(name, state=EventState.FAILED, exception=ex)
log_install_state(name, state=EventState.FAILED, exception=ex)
# We don't have a pip url or registry entry, so we can't install the connector
raise

Expand All @@ -143,25 +143,10 @@ def get_source(
executor=executor,
)
except Exception as e:
_log_install_state(name, state=EventState.FAILED, exception=e)
log_install_state(name, state=EventState.FAILED, exception=e)
raise


__all__ = [
"get_source",
]


def _log_install_state(
name: str,
state: EventState,
exception: Exception | None = None,
) -> None:
"""Log an install event."""
send_telemetry(
source=name,
cache=None,
state=state,
event_type=EventType.INSTALL,
exception=exception,
)

0 comments on commit 2d90641

Please sign in to comment.