Skip to content

Commit

Permalink
feat(state): improved stateful mode, Redis and S3 are now supported (#9)
Browse files Browse the repository at this point in the history
* feat(state): improved stateful mode, Redis and S3 are now supported
  • Loading branch information
akimrx authored Apr 6, 2024
1 parent 9fd398f commit 0eae570
Show file tree
Hide file tree
Showing 17 changed files with 915 additions and 446 deletions.
334 changes: 193 additions & 141 deletions README.md

Large diffs are not rendered by default.

11 changes: 3 additions & 8 deletions examples/extended_model/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@

from tracker_exporter.models.issue import TrackerIssue
from tracker_exporter.utils.helpers import to_snake_case, validate_resource
from tracker_exporter import configure_sentry, run_etl

from yandex_tracker_client.collections import Issues


class CustomIssueFields:
class CustomIssueFieldsMixin:
"""
Additional custom fields for Yandex Tracker issue.
Must be created in the Clickhouse issue table.
Expand All @@ -18,20 +17,16 @@ def __init__(self, issue: Issues) -> None:
self.baz = True if "baz" in issue.tags else False


class ExtendedTrackerIssue(TrackerIssue, CustomIssueFields):
class ExtendedTrackerIssue(CustomIssueFieldsMixin, TrackerIssue):
"""Extended Yandex Tracker issue model with custom fields."""

def __init__(self, issue: Issues) -> None:
super().__init__(issue)
CustomIssueFields.__init__(self, issue)


def main() -> None:
"""Entry point."""
run_etl(
ignore_exceptions=False,
issue_model=ExtendedTrackerIssue
)
run_etl(ignore_exceptions=False, issue_model=ExtendedTrackerIssue)


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
yandex_tracker_client==2.*
boto3==1.34.*
redis==5.0.*
datadog==0.47.*
APScheduler==3.10.*
requests==2.31.*
Expand Down
18 changes: 2 additions & 16 deletions tracker_exporter/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,17 @@
from tracker_exporter.main import (
run_etl,
configure_sentry,
configure_jsonfile_storage,
configure_state_service,
configure_state_manager,
)
from tracker_exporter.etl import YandexTrackerETL
from tracker_exporter.services.clickhouse import ClickhouseClient
from tracker_exporter.services.tracker import YandexTrackerClient
from tracker_exporter.services.state import (
StateKeeper,
JsonStateStorage,
RedisStateStorage,
S3FileStorageStrategy,
LocalFileStorageStrategy,
)

__all__ = [
"ClickhouseClient",
"YandexTrackerClient",
"YandexTrackerETL",
"StateKeeper",
"JsonStateStorage",
"RedisStateStorage",
"S3FileStorageStrategy",
"LocalFileStorageStrategy",
"run_etl",
"configure_sentry",
"configure_jsonfile_storage",
"configure_state_service",
"configure_state_manager",
]
2 changes: 1 addition & 1 deletion tracker_exporter/_meta.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "1.0.3"
version = "2.0.0"
url = "https://github.com/akimrx/yandex-tracker-exporter"
download_url = "https://pypi.org/project/tracker-exporter/"
appname = "yandex_tracker_exporter"
Expand Down
137 changes: 66 additions & 71 deletions tracker_exporter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@
import logging

from functools import lru_cache
from typing import List
from typing import Literal, Optional, Union
from pydantic import validator, root_validator
from pydantic_settings import BaseSettings

from tracker_exporter.models.base import (
YandexTrackerLanguages,
LogLevels,
StateStorageTypes,
JsonStorageStrategies,
)
from tracker_exporter.models.base import YandexTrackerLanguages, LogLevels
from tracker_exporter.exceptions import ConfigurationError
from tracker_exporter.services.monitoring import DogStatsdClient

Expand All @@ -24,13 +19,13 @@
class MonitoringSettings(BaseSettings):
"""Observability settings."""

metrics_enabled: bool = False
metrics_host: str = "localhost"
metrics_port: int = 8125
metrics_base_prefix: str = "tracker_exporter"
metrics_base_labels: List[str] = []
sentry_enabled: bool = False
sentry_dsn: str | None = None
metrics_enabled: Optional[bool] = False
metrics_host: Optional[str] = "localhost"
metrics_port: Optional[int] = 8125
metrics_base_prefix: Optional[str] = "tracker_exporter"
metrics_base_labels: Optional[list[str]] = []
sentry_enabled: Optional[bool] = False
sentry_dsn: Optional[str] = None

@validator("sentry_dsn", pre=True, always=True)
def validate_sentry_dsn(cls, value: str | None, values: dict) -> str:
Expand All @@ -46,23 +41,23 @@ class Config:
class ClickhouseSettings(BaseSettings):
"""Settings for Clickhouse storage."""

enable_upload: bool = True
host: str = "localhost"
proto: str = "http"
port: int = 8123
cacert_path: str | None = None
enable_upload: Optional[bool] = True
host: Optional[str] = "localhost"
proto: Optional[str] = "http"
port: Optional[int] = 8123
cacert_path: Optional[str] = None
serverless_proxy_id: str | None = None
username: str = "default"
password: str | None = None
database: str = "agile"
issues_table: str = "issues"
issue_metrics_table: str = "issue_metrics"
issues_changelog_table: str = "issues_changelog"
auto_deduplicate: bool = True
backoff_base_delay: int | float = 0.5
backoff_expo_factor: int | float = 2.5
backoff_max_tries: int = 3
backoff_jitter: bool = True
username: Optional[str] = "default"
password: Optional[str] = None
database: Optional[str] = "agile"
issues_table: Optional[str] = "issues"
issue_metrics_table: Optional[str] = "issue_metrics"
issues_changelog_table: Optional[str] = "issues_changelog"
auto_deduplicate: Optional[bool] = True
backoff_base_delay: Optional[Union[int, float]] = 0.5
backoff_expo_factor: Optional[Union[int, float]] = 2.5
backoff_max_tries: Optional[int] = 3
backoff_jitter: Optional[bool] = True

@validator("serverless_proxy_id", pre=True, always=True)
def validate_serverless_proxy_id(cls, value: str | None, values: dict) -> str:
Expand All @@ -85,10 +80,10 @@ class Config:
class IssuesSearchSettings(BaseSettings):
"""Settings for search & export."""

query: str | None = None
range: str = "2h"
queues: str | List[str] | None = None
per_page_limit: int = 100
query: Optional[str] = None
range: Optional[str] = "2h"
queues: Optional[Union[str, list[str]]] = None
per_page_limit: Optional[int] = 100

@validator("queues", pre=True, always=True)
def validate_queues(cls, value: str) -> list:
Expand All @@ -108,15 +103,15 @@ class Config:
class TrackerSettings(BaseSettings):
"""Settings for Yandex.Tracker client."""

loglevel: LogLevels = LogLevels.warning
token: str | None = None
org_id: str | None = None
iam_token: str | None = None
cloud_org_id: str | None = None
timeout: int = 10
max_retries: int = 10
language: YandexTrackerLanguages = YandexTrackerLanguages.en
timezone: str = "Europe/Moscow"
loglevel: Optional[LogLevels] = LogLevels.warning
token: Optional[str] = None
org_id: Optional[str] = None
iam_token: Optional[str] = None
cloud_org_id: Optional[str] = None
timeout: Optional[int] = 10
max_retries: Optional[int] = 10
language: Optional[YandexTrackerLanguages] = YandexTrackerLanguages.en
timezone: Optional[str] = "Europe/Moscow"
search: IssuesSearchSettings = IssuesSearchSettings()

@root_validator(pre=True)
Expand Down Expand Up @@ -145,16 +140,16 @@ class Config:
class StateSettings(BaseSettings):
"""Settings for stateful mode."""

storage: StateStorageTypes | None = StateStorageTypes.jsonfile
redis_dsn: str = "redis://localhost:6379"
jsonfile_strategy: JsonStorageStrategies = JsonStorageStrategies.local
jsonfile_path: str = "./state.json"
jsonfile_s3_bucket: str | None = None
jsonfile_s3_region: str = "eu-east-1"
jsonfile_s3_endpoint: str | None = None
jsonfile_s3_access_key: str | None = None
jsonfile_s3_secret_key: str | None = None
custom_storage_params: dict = {}
storage: Optional[Literal["redis", "jsonfile", "custom"]] = "jsonfile"
redis_dsn: Optional[str] = "redis://localhost:6379"
jsonfile_strategy: Optional[Literal["s3", "local"]] = "local"
jsonfile_path: Optional[str] = "state.json"
jsonfile_s3_bucket: Optional[str] = None
jsonfile_s3_region: Optional[str] = "us-east-1"
jsonfile_s3_endpoint: Optional[str] = None
jsonfile_s3_access_key: Optional[str] = None
jsonfile_s3_secret_key: Optional[str] = None
custom_storage_params: Optional[dict] = {}

@root_validator(pre=True)
def validate_state(cls, values) -> str:
Expand All @@ -172,7 +167,7 @@ def validate_state(cls, values) -> str:
)
)

if jsonfile_strategy == JsonStorageStrategies.s3 and not s3_is_configured:
if jsonfile_strategy == "s3" and not s3_is_configured:
raise ConfigurationError("S3 must be configured for JSONFileStorage with S3 strategy.")

return values
Expand All @@ -188,23 +183,23 @@ class Settings(BaseSettings):
clickhouse: ClickhouseSettings = ClickhouseSettings()
tracker: TrackerSettings = TrackerSettings # TODO (akimrx): research, called class not see TOKEN's
state: StateSettings = StateSettings()
stateful: bool = False
stateful_initial_range: str = "1w"
changelog_export_enabled: bool = False
log_etl_stats: bool = True
log_etl_stats_each_n_iter: int = 100

loglevel: LogLevels = LogLevels.info
workdays: List[int] = [0, 1, 2, 3, 4]
business_hours_start: datetime.time = datetime.time(9)
business_hours_end: datetime.time = datetime.time(22)
datetime_response_format: str = "%Y-%m-%dT%H:%M:%S.%f%z"
datetime_query_format: str = "%Y-%m-%d %H:%M:%S"
datetime_clickhouse_format: str = "%Y-%m-%dT%H:%M:%S.%f"

etl_interval_minutes: int = 30
closed_issue_statuses: str | list = "closed,rejected,resolved,cancelled,released"
not_nullable_fields: tuple | list | str = (
stateful: Optional[bool] = False
stateful_initial_range: Optional[str] = "1w"
changelog_export_enabled: Optional[bool] = False
log_etl_stats: Optional[bool] = True
log_etl_stats_each_n_iter: Optional[int] = 100

loglevel: Optional[LogLevels] = LogLevels.info
workdays: Optional[list[int]] = [0, 1, 2, 3, 4]
business_hours_start: Optional[datetime.time] = datetime.time(9)
business_hours_end: Optional[datetime.time] = datetime.time(22)
datetime_response_format: Optional[str] = "%Y-%m-%dT%H:%M:%S.%f%z"
datetime_query_format: Optional[str] = "%Y-%m-%d %H:%M:%S"
datetime_clickhouse_format: Optional[str] = "%Y-%m-%dT%H:%M:%S.%f"

etl_interval_minutes: Optional[int] = 30
closed_issue_statuses: Optional[Union[str, list]] = "closed,rejected,resolved,cancelled,released"
not_nullable_fields: Optional[Union[tuple, list, str]] = (
"created_at",
"resolved_at",
"closed_at",
Expand Down
13 changes: 7 additions & 6 deletions tracker_exporter/etl.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import time
import logging
from datetime import datetime, timedelta
from typing import Tuple, List
from typing import Tuple, List, Optional
from yandex_tracker_client.collections import Issues
from yandex_tracker_client.objects import SeekablePaginatedList
from yandex_tracker_client.exceptions import Forbidden

from tracker_exporter.config import config, monitoring
from tracker_exporter.models.issue import TrackerIssue
from tracker_exporter.models.base import ClickhousePayload
from tracker_exporter.services.state import StateKeeper
from tracker_exporter.state.managers import AbstractStateManager
from tracker_exporter.services.tracker import YandexTrackerClient
from tracker_exporter.services.clickhouse import ClickhouseClient
from tracker_exporter.exceptions import ConfigurationError, UploadError, ExportOrTransformError
Expand All @@ -31,7 +31,7 @@ def __init__(
*,
tracker_client: YandexTrackerClient,
clickhouse_client: ClickhouseClient,
statekeeper: StateKeeper | None = None,
state_manager: Optional[AbstractStateManager] = None,
issue_model: TrackerIssue = TrackerIssue,
database: str = config.clickhouse.database,
issues_table: str = config.clickhouse.issues_table,
Expand All @@ -42,7 +42,7 @@ def __init__(
) -> None:
self.tracker = tracker_client
self.clickhouse = clickhouse_client
self.state = statekeeper
self.state = state_manager
self.issue_model = issue_model
self.database = database
self.issues_table = issues_table
Expand Down Expand Up @@ -142,7 +142,7 @@ def _export_and_transform(
found_issues = self.tracker.search_issues(query=query, filter=filter, order=order, limit=limit)
if len(found_issues) == 0:
logger.info("Nothing to export. Skipping ETL")
return
return issues, changelog_events, metrics, possible_new_state

if isinstance(found_issues, SeekablePaginatedList):
pagination = True
Expand Down Expand Up @@ -216,7 +216,7 @@ def run(
try:
issues, changelogs, metrics, possible_new_state = self._export_and_transform(**query, limit=limit)
if stateful and possible_new_state is not None:
logger.info(f"Possible new state: {possible_new_state}")
logger.info(f"Stateful mode enabled, fetching possible new state: {possible_new_state}")
last_saved_state = self.state.get(self.state_key)
if last_saved_state == possible_new_state and len(issues) <= 1 and len(metrics) <= 1:
logger.info("Data already is up-to-date, skipping upload stage")
Expand Down Expand Up @@ -247,6 +247,7 @@ def run(
raise UploadError(str(exc))
else:
if all((stateful, self.state, possible_new_state)):
logger.info(f"Saving last ETL timestamp {possible_new_state}")
self.state.set(self.state_key, possible_new_state)
else:
logger.info(
Expand Down
4 changes: 4 additions & 0 deletions tracker_exporter/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ class JsonFileNotFound(Exception):

class InvalidJsonFormat(Exception):
pass


class SerializerError(Exception):
pass
Loading

0 comments on commit 0eae570

Please sign in to comment.