diff --git a/README.md b/README.md index e7d9543..45b8b3b 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,26 @@ [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/tracker-exporter.svg)](https://pypi.org/project/tracker-exporter/) [![PyPi Package](https://img.shields.io/pypi/v/tracker-exporter.svg)](https://pypi.org/project/tracker-exporter/) -[![Codecov](https://codecov.io/gh/akimrx/yandex-tracker-exporter/branch/master/graph/badge.svg)](https://app.codecov.io/gh/akimrx/yandex-tracker-exporter) [![Tests](https://github.com/akimrx/yandex-tracker-exporter/workflows/Tests/badge.svg)](https://github.com/akimrx/yandex-tracker-exporter) - # Yandex.Tracker ETL -Export issue metadata & agile metrics, transform and load to OLAP data storage. Metrics based on issue changelog. +Export issue metadata & agile metrics, transform and load to OLAP data storage. Metrics based on issue changelog. ⚠️ **Important** **Versions 1.x.x incompatible with 0.1.x. New versions works only on Python >= 3.10** > You can fork this repository and refine the tool the way you want. Or use it as it is - this will allow you to build basic analytics on the tasks from Yandex.Tracker. -Require: -* Python `>=3.10.*` -* Clickhouse + specific [tables](/migrations/clickhouse/) (how to run [migration](#migration)) +**Require:** + +- Python `>=3.10.*` +- Clickhouse + specific [tables](/migrations/clickhouse/) (how to run [migration](#migration)) + +**Collects:** + +- Issue metadata (i.e. title, author, assignee, components, tags, status, etc) +- Issue changelog (i.e the history of all the events that occurred with the task) +- Calculated issue metrics by status (i.e. the time spent in a particular status) like Cycle & Lead time ## Datalens Demo @@ -34,17 +39,69 @@ Sometimes he has to go to a lot of endpoints to collect what needs to be taken t By default, the exporter processes only those tasks that were changed during the sliding window specified in the `EXPORTER_TRACKER__SEARCH__RANGE` parameter. So, all tasks that have activity (changes) will be uploaded to the storage. Something like eventual consistency. -If you need to upload historical data that will never be updated again, you can flexibly control behavior through the [environment variables described below](#general-settings). +If you need to upload historical data that will never be updated again, you can flexibly control behavior through the [environment variables described below](#general-settings). Here are some recipes for a one-shot export: + 1. Launch a exporter with the parameter `EXPORTER_TRACKER__SEARCH__RANGE`, for example, a year ago 2. More specifically: describe the query in the tracker's QL format using the `EXPORTER_TRACKER__SEARCH__QUERY` environment variable. This way you can export point bundles of tasks and bypass with the [Tracker's strict limit of 10,000 tasks](https://github.com/yandex/yandex_tracker_client/issues/13). +Finally run exporter with `--run-once` flag. -**Collects:** -- Issue metadata (i.e. title, author, assignee, components, tags, status, etc) -- Issue changelog (i.e the history of all the events that occurred with the task) -- Calculated issue metrics by status (i.e. the time spent in a particular status) like Cycle & Lead time +### Stateful mode + +By default, the exporter does not store the state, and as described above, it works within the sliding window. This behavior is not the most optimal, because the exporter performs repeated processing for previous tasks. + +The behavior can be changed by enabling stateful mode, which supports 3 backends: + +- Local JSON file +- Remote JSON file (S3 object storage) +- Redis + +#### Local JSON file + +```ini +EXPORTER_STATEFUL=true + +# used for the first run to capture historical issues +# when the previous state is not exists. +EXPORTER_STATEFUL_INITIAL_RANGE=7d # this is default value + +EXPORTER_STATE__STORAGE=jsonfile # this is default value +EXPORTER_STATE__JSONFILE_STRATEGY=local # this is default value + +... +``` + +#### Remote JSON file (S3) + +```ini +EXPORTER_STATEFUL=true +EXPORTER_STATEFUL_INITIAL_RANGE=7d + +EXPORTER_STATE__STORAGE=jsonfile +EXPORTER_STATE__JSONFILE_STRATEGY=s3 + +EXPORTER_STATE__JSONFILE_S3_BUCKET=tracker-exporter-state +EXPORTER_STATE__JSONFILE_S3_ACCESS_KEY=YCAxxxxxxxx +EXPORTER_STATE__JSONFILE_S3_SECRET_KEY=YCxxx-xxxxxxxxxxxxxxx +EXPORTER_STATE__JSONFILE_S3_ENDPOINT=https://storage.yandexcloud.net + +... +``` + +#### Redis + +```ini + +EXPORTER_STATEFUL=true +EXPORTER_STATEFUL_INITIAL_RANGE=7d + +EXPORTER_STATE__STORAGE=redis +EXPORTER_STATE__REDIS_DSN=redis://localhost:6379 + +... +``` ### Cycle time calculation algorithm @@ -63,12 +120,13 @@ Employees start working on the task, the history of the task and the actions of 1. A new task has created with the initial status `Open`, metrics are not counted. 2. The developer has taken the task to work, the transition is `Open -> In progress`, the metric for the `Open` status has been calculated, while the current status `In progress` is not yet considered. 3. The developer has submitted the task to testing, the transition `In progress -> Testing`, the metric for the status `In progress` has been calculated, while the current status is being `Testing` is not yet considered. -4. QA Engineer returned the task for revision, the transition `Testing -> In progress`, the time in the status `Testing` has been calculated, the status `In progress` has the previous metric and has not changed yet. +4. QA Engineer returned the task for revision, the transition `Testing -> In progress`, the time in the status `Testing` has been calculated, the status `In progress` has the previous metric and has not changed yet. 5. The task has been finalized, re-submitted to testing, the transition `In progress -> Testing`, the delta of this transition is added incrementally to the previous value of the metric `In progress`, but `Testing` has not changed yet. -6 The task has been tested and submitted for release, the transition `Testing -> Ready for release`, the delta of this transition is incrementally added to the previous value of the metric `Testing`, the `Ready for Release` status is not considered yet. -7. The release is completed, the task is closed, the transition `Ready for release -> Closed`, the metric for the `Ready for Release` status is considered. **The metric of the final status of this task (`Closed`) will not be (re)calculated.** + 6 The task has been tested and submitted for release, the transition `Testing -> Ready for release`, the delta of this transition is incrementally added to the previous value of the metric `Testing`, the `Ready for Release` status is not considered yet. +6. The release is completed, the task is closed, the transition `Ready for release -> Closed`, the metric for the `Ready for Release` status is considered. **The metric of the final status of this task (`Closed`) will not be (re)calculated.** #### Planned improvements + Consider the status metric if a transition has been made to it, even if such a status is current and the next transition has not yet been made from it. Exclude the final statuses from the innovation. ## Tech stats @@ -90,7 +148,7 @@ The processing speed of one issue depends on how many changes there are in the i ## Extend exported issue data by your custom fields -Just declare your `main.py` module in which extended the [TrackerIssue](tracker_exporter/models/issue.py#L65) model using multiple inheritance like +Just declare your `main.py` module in which extended the [TrackerIssue](tracker_exporter/models/issue.py#L65) model like: ```python @@ -99,24 +157,21 @@ from tracker_exporter.utils.helpers import validate_resource from tracker_exporter import run_etl -class CustomIssueFields: +class ExtendedTrackerIssue(TrackerIssue): def __init__(self, issue: Issues) -> None: + super().__init__(issue) + self.foo_custom_field = validate_resource(issue, "fooCustomField") self.bar_custom_field = validate_resource(issue, "barCustomField") -class ExtendedTrackerIssue(TrackerIssue, CustomIssueFields): - def __init__(self, issue: Issues) -> None: - super().__init__(issue) - CustomIssueFields.__init__(self, issue) - - run_etl(issue_model=ExtendedTrackerIssue) ``` -See full example [here](examples/extended_model/main.py) +**Don't forget about adding fields to the Clickhouse migration.** +See full example with mixin [here](examples/extended_model/main.py) ## Usage @@ -159,7 +214,6 @@ Read about the settings [here](#environment-variables-settings) tracker-exporter --env-file /home/akimrx/tracker/.settings ``` - ### Docker ```bash @@ -185,6 +239,7 @@ Edit the inventory file `ansible/inventory/hosts.yml` and just run ansible-playb > For the role to work correctly, docker must be installed on the target server. Example Clickhouse installation: + ```bash git clone https://github.com/akimrx/yandex-tracker-exporter.git cd yandex-tracker-exporter @@ -197,7 +252,6 @@ ansible-playbook -i inventory/hosts.yml playbooks/clickhouse.yml --limit agile Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansible-clickhouse-role) - ## Yandex.Cloud – Cloud Functions ![](/docs/images/agile_metrics_cloud.png) @@ -206,23 +260,24 @@ Also, you can use [this extended Clickhouse role](https://github.com/akimrx/ansi > How to: https://cloud.yandex.com/en/docs/managed-clickhouse/operations/cluster-create -* Set user for exporter, example: `agile` -* Set a database name, example: `agile` -* Enable `Serverless access` flag -* For testing enable host public access -* Enable `Access from the management console` flag -* Run migration or manual create tables (see migration block [here](#migration), see [sql](/migrations/clickhouse/)) +- Set user for exporter, example: `agile` +- Set a database name, example: `agile` +- Enable `Serverless access` flag +- For testing enable host public access +- Enable `Access from the management console` flag +- Run migration or manual create tables (see migration block [here](#migration), see [sql](/migrations/clickhouse/)) ### Create Cloud Function > How to: https://cloud.yandex.com/en/docs/functions/quickstart/create-function/python-function-quickstart -* Use Python >= 3.10 -* Copy/paste example content from `examples/serverless` ([code](/examples/serverless/)) -* Set entrypoint: `main.handler` (for code from examples) -* Set function timeout to `600`, because the launch can be long if there are a lot of updated issues during the collection period -* Set memory to `512MB` or more -* Add environment variables (see variables block [here](#configuration-via-environment-variables)) +- Use Python >= 3.10 +- Copy/paste example content from `examples/serverless` ([code](/examples/serverless/)) +- Set entrypoint: `main.handler` (for code from examples) +- Set function timeout to `600`, because the launch can be long if there are a lot of updated issues during the collection period +- Set memory to `512MB` or more +- Add environment variables (see variables block [here](#configuration-via-environment-variables)) + ```ini EXPORTER_TRACKER__TOKEN=XXXXXXXXXXXXXXXX EXPORTER_TRACKER__CLOUD_ORG_ID=123456 @@ -237,14 +292,14 @@ EXPORTER_CLICKHOUSE__PASSWORD=xxxx EXPORTER_CHANGELOG_EXPORT_ENABLED="false" ``` -* Release function -* Run test -* See logs +- Release function +- Run test +- See logs ![](/docs/images/logs.png) - ##### Serverless database connection without public access + If you don't want to enable clickhouse public access, use service account with such permissions - `serverless.mdbProxies.user` and set environment variables below: ```bash @@ -254,27 +309,24 @@ EXPORTER_CLICKHOUSE__SERVERLESS_PROXY_ID=akfd3bhqk3xxxxxxxxxxxxx > How to create database connection: https://cloud.yandex.com/en/docs/functions/operations/database-connection -Also, the `EXPORTER_CLICKHOUSE__PASSWORD` variable with service account must be replaced by IAM-token. Keep this in mind. +Also, the `EXPORTER_CLICKHOUSE__PASSWORD` variable with service account must be replaced by IAM-token. Keep this in mind. Probably, you should get it in the function code, because the IAM-token works for a limited period of time. ### Create Trigger > How to: https://cloud.yandex.com/en/docs/functions/quickstart/create-trigger/timer-quickstart -* Create new trigger -* Choose type `Timer` -* Set interval every hour: `0 * ? * * *` -* Select your function -* Create serverless service account or use an existing one -* Save trigger - - - - +- Create new trigger +- Choose type `Timer` +- Set interval every hour: `0 * ? * * *` +- Select your function +- Create serverless service account or use an existing one +- Save trigger # Visualization You can use any BI/observability tool for visualization, for example: + - Yandex DataLens (btw, this is [opensource](https://github.com/datalens-tech/datalens)). Also see [demo set](https://datalens.yandex.ru/marketplace/f2ejcgrg2h910r7cc93u) - Apache Superset - PowerBI @@ -282,7 +334,6 @@ You can use any BI/observability tool for visualization, for example: ![](/docs/images/datalens_example.png) - # Migration Based on [go-migrate](https://github.com/golang-migrate/migrate) tool. @@ -290,6 +341,7 @@ Based on [go-migrate](https://github.com/golang-migrate/migrate) tool. ## Download and install go-migrate tool ### macOS + ```shell wget https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.darwin-amd64.tar.gz -O migrate.tar.gz @@ -298,6 +350,7 @@ mv migrate ~/bin ``` ### Linux + ```shell wget https://github.com/golang-migrate/migrate/releases/download/v4.15.2/migrate.linux-amd64.tar.gz -O migrate.tar.gz @@ -351,113 +404,112 @@ See config declaration [here](/tracker_exporter/config.py) ## General settings -| variable | description | -|----------|-------------| -| `EXPORTER_STATEFUL` | Enable stateful mode. Required `EXPORTER_STATE__*` params. Default is `False` | -| `EXPORTER_STATEFUL_INITIAL_RANGE` | Initial search range when unknown last state. Default: `1w` | -| `EXPORTER_CHANGELOG_EXPORT_ENABLED` | Enable export all issues changelog to Clickhouse. **Can greatly slow down exports** (x5 - x10). Default is `False` | -| `EXPORTER_LOGLEVEL` | ETL log level. Default: `info` | -| `EXPORTER_LOG_ETL_STATS` | Enable logging transform stats every N iteration. Default is `True` | -| `EXPORTER_LOG_ETL_STATS_EACH_N_ITER` | How many iterations must pass to log stats. Default is `100` | -| `EXPORTER_WORKDAYS` | Workdays for calculate business time. 0 - mon, 6 - sun. Default: `[0,1,2,3,4]` | -| `EXPORTER_BUSINESS_HOURS_START` | Business hours start for calculate business time. Default: `09:00:00` | -| `EXPORTER_BUSINESS_HOURS_END` | Business hours end for calculate business time. Default: `22:00:00` | -| `EXPORTER_DATETIME_RESPONSE_FORMAT` | Yandex.Tracker datetime format in responses. Default: `%Y-%m-%dT%H:%M:%S.%f%z` | -| `EXPORTER_DATETIME_QUERY_FORMAT` | Datetime format for search queries. Default: `%Y-%m-%d %H:%M:%S` | -| `EXPORTER_DATETIME_CLICKHOUSE_FORMAT` | Datetime format for Clickhouse. Default: `%Y-%m-%dT%H:%M:%S.%f` | -| `EXPORTER_ETL_INTERVAL_MINUTES` | Interval between run ETL. Default: `30` (minutes) | -| `EXPORTER_CLOSED_ISSUE_STATUSES` | Statuses for mark issue as closed. Default: `closed,rejected,resolved,cancelled,released` | -| `EXPORTER_NOT_NULLABLE_FIELDS` | Fields that should never be null (e.g. dates). Default: all datetime fields | +| variable | description | +| ------------------------------------- | ------------------------------------------------------------------------------------------------------------------ | +| `EXPORTER_STATEFUL` | Enable stateful mode. Required `EXPORTER_STATE__*` params. Default is `False` | +| `EXPORTER_STATEFUL_INITIAL_RANGE` | Initial search range when unknown last state. Default: `1w` | +| `EXPORTER_CHANGELOG_EXPORT_ENABLED` | Enable export all issues changelog to Clickhouse. **Can greatly slow down exports** (x5 - x10). Default is `False` | +| `EXPORTER_LOGLEVEL` | ETL log level. Default: `info` | +| `EXPORTER_LOG_ETL_STATS` | Enable logging transform stats every N iteration. Default is `True` | +| `EXPORTER_LOG_ETL_STATS_EACH_N_ITER` | How many iterations must pass to log stats. Default is `100` | +| `EXPORTER_WORKDAYS` | Workdays for calculate business time. 0 - mon, 6 - sun. Default: `[0,1,2,3,4]` | +| `EXPORTER_BUSINESS_HOURS_START` | Business hours start for calculate business time. Default: `09:00:00` | +| `EXPORTER_BUSINESS_HOURS_END` | Business hours end for calculate business time. Default: `22:00:00` | +| `EXPORTER_DATETIME_RESPONSE_FORMAT` | Yandex.Tracker datetime format in responses. Default: `%Y-%m-%dT%H:%M:%S.%f%z` | +| `EXPORTER_DATETIME_QUERY_FORMAT` | Datetime format for search queries. Default: `%Y-%m-%d %H:%M:%S` | +| `EXPORTER_DATETIME_CLICKHOUSE_FORMAT` | Datetime format for Clickhouse. Default: `%Y-%m-%dT%H:%M:%S.%f` | +| `EXPORTER_ETL_INTERVAL_MINUTES` | Interval between run ETL. Default: `30` (minutes) | +| `EXPORTER_CLOSED_ISSUE_STATUSES` | Statuses for mark issue as closed. Default: `closed,rejected,resolved,cancelled,released` | +| `EXPORTER_NOT_NULLABLE_FIELDS` | Fields that should never be null (e.g. dates). Default: all datetime fields | ## Tracker settings -| variable | description | -|----------|-------------| -| `EXPORTER_TRACKER__LOGLEVEL` | Log level for Yandex.Tracker SDK. Default: `warning` | -| `EXPORTER_TRACKER__TOKEN` | OAuth2 token. Required if `EXPORTER_TRACKER__IAM_TOKEN` is not passed | -| `EXPORTER_TRACKER__ORG_ID` | Yandex360 organization ID. Required if `EXPORTER_TRACKER__CLOUD_ORG_ID` is not passed | -| `EXPORTER_TRACKER__IAM_TOKEN` | Yandex.Cloud IAM token. Required if `EXPORTER_TRACKER__TOKEN` is not passed | -| `EXPORTER_TRACKER__CLOUD_ORG_ID` | Yandex.Cloud organization ID. Required if `EXPORTER_TRACKER__ORG_ID` is not passed | -| `EXPORTER_TRACKER__TIMEOUT` | Yandex.Tracker HTTP requests timeout. Default: `10` (sec) | -| `EXPORTER_TRACKER__MAX_RETRIES` | Yandex.Tracker HTTP requests max retries. Default: `10` | -| `EXPORTER_TRACKER__LANGUAGE` | Yandex.Tracker language. Default: `en` | -| `EXPORTER_TRACKER__TIMEZONE` | Yandex.Tracker timezone. Default: `Europe/Moscow` | -| `EXPORTER_TRACKER__SEARCH__QUERY` | Custom query for search issues. This variable has the highest priority and overrides other search parameters. Default is empty | -| `EXPORTER_TRACKER__SEARCH__RANGE` | Search issues window. Has no effect in stateful mode. Default: `2h` | -| `EXPORTER_TRACKER__SEARCH__QUEUES` | Include or exclude queues in search. Example: `DEV,SRE,!TEST,!TRASH` Default is empty (i.e. all queues) | -| `EXPORTER_TRACKER__SEARCH__PER_PAGE_LIMIT` | Search results per page. Default: `100` | +| variable | description | +| ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------ | +| `EXPORTER_TRACKER__LOGLEVEL` | Log level for Yandex.Tracker SDK. Default: `warning` | +| `EXPORTER_TRACKER__TOKEN` | OAuth2 token. Required if `EXPORTER_TRACKER__IAM_TOKEN` is not passed | +| `EXPORTER_TRACKER__ORG_ID` | Yandex360 organization ID. Required if `EXPORTER_TRACKER__CLOUD_ORG_ID` is not passed | +| `EXPORTER_TRACKER__IAM_TOKEN` | Yandex.Cloud IAM token. Required if `EXPORTER_TRACKER__TOKEN` is not passed | +| `EXPORTER_TRACKER__CLOUD_ORG_ID` | Yandex.Cloud organization ID. Required if `EXPORTER_TRACKER__ORG_ID` is not passed | +| `EXPORTER_TRACKER__TIMEOUT` | Yandex.Tracker HTTP requests timeout. Default: `10` (sec) | +| `EXPORTER_TRACKER__MAX_RETRIES` | Yandex.Tracker HTTP requests max retries. Default: `10` | +| `EXPORTER_TRACKER__LANGUAGE` | Yandex.Tracker language. Default: `en` | +| `EXPORTER_TRACKER__TIMEZONE` | Yandex.Tracker timezone. Default: `Europe/Moscow` | +| `EXPORTER_TRACKER__SEARCH__QUERY` | Custom query for search issues. This variable has the highest priority and overrides other search parameters. Default is empty | +| `EXPORTER_TRACKER__SEARCH__RANGE` | Search issues window. Has no effect in stateful mode. Default: `2h` | +| `EXPORTER_TRACKER__SEARCH__QUEUES` | Include or exclude queues in search. Example: `DEV,SRE,!TEST,!TRASH` Default is empty (i.e. all queues) | +| `EXPORTER_TRACKER__SEARCH__PER_PAGE_LIMIT` | Search results per page. Default: `100` | ## Clickhouse settings -| variable | description | -|----------|-------------| -| `EXPORTER_CLICKHOUSE__ENABLE_UPLOAD` | Enable upload data to Clickhouse. Default is `True` | -| `EXPORTER_CLICKHOUSE__HOST` | Clickhouse host. Default: `localhost` | -| `EXPORTER_CLICKHOUSE__PROTO` | Clickhouse protocol: http or https. Default: `http` | -| `EXPORTER_CLICKHOUSE__PORT` | Clickhouse HTTP(S) port. Default: `8123` -| `EXPORTER_CLICKHOUSE__CACERT_PATH` | Path to CA cert. Only for HTTPS proto. Default is empty | -| `EXPORTER_CLICKHOUSE__SERVERLESS_PROXY_ID` | Yandex Cloud Functions proxy ID. Default is empty | -| `EXPORTER_CLICKHOUSE__USERNAME` | Clickhouse username. Default: `default` | -| `EXPORTER_CLICKHOUSE__PASSWORD` | Clickhouse password. Can be empty. Default is empty | -| `EXPORTER_CLICKHOUSE__DATABASE` | Clickhouse database. Default: `agile` | -| `EXPORTER_CLICKHOUSE__ISSUES_TABLE` | Clickhouse table for issues metadata. Default: `issues` | -| `EXPORTER_CLICKHOUSE__ISSUE_METRICS_TABLE` | Clickhouse table for issue metrics. Default: `issue_metrics` | +| variable | description | +| --------------------------------------------- | ------------------------------------------------------------------ | +| `EXPORTER_CLICKHOUSE__ENABLE_UPLOAD` | Enable upload data to Clickhouse. Default is `True` | +| `EXPORTER_CLICKHOUSE__HOST` | Clickhouse host. Default: `localhost` | +| `EXPORTER_CLICKHOUSE__PROTO` | Clickhouse protocol: http or https. Default: `http` | +| `EXPORTER_CLICKHOUSE__PORT` | Clickhouse HTTP(S) port. Default: `8123` | +| `EXPORTER_CLICKHOUSE__CACERT_PATH` | Path to CA cert. Only for HTTPS proto. Default is empty | +| `EXPORTER_CLICKHOUSE__SERVERLESS_PROXY_ID` | Yandex Cloud Functions proxy ID. Default is empty | +| `EXPORTER_CLICKHOUSE__USERNAME` | Clickhouse username. Default: `default` | +| `EXPORTER_CLICKHOUSE__PASSWORD` | Clickhouse password. Can be empty. Default is empty | +| `EXPORTER_CLICKHOUSE__DATABASE` | Clickhouse database. Default: `agile` | +| `EXPORTER_CLICKHOUSE__ISSUES_TABLE` | Clickhouse table for issues metadata. Default: `issues` | +| `EXPORTER_CLICKHOUSE__ISSUE_METRICS_TABLE` | Clickhouse table for issue metrics. Default: `issue_metrics` | | `EXPORTER_CLICKHOUSE__ISSUES_CHANGELOG_TABLE` | Clickhouse table for issues changelog. Default: `issues_changelog` | -| `EXPORTER_CLICKHOUSE__AUTO_DEDUPLICATE` | Execute `OPTIMIZE` after each `INSERT`. Default is `True` | -| `EXPORTER_CLICKHOUSE__BACKOFF_BASE_DELAY` | Base delay for backoff strategy. Default: `0.5` (sec) | -| `EXPORTER_CLICKHOUSE__BACKOFF_EXPO_FACTOR` | Exponential factor for multiply every try. Default: `2.5` (sec) | -| `EXPORTER_CLICKHOUSE__BACKOFF_MAX_TRIES` | Max tries for backoff strategy. Default: `3` | -| `EXPORTER_CLICKHOUSE__BACKOFF_JITTER` | Enable jitter (randomize delay) for retries. Default: `True` | +| `EXPORTER_CLICKHOUSE__AUTO_DEDUPLICATE` | Execute `OPTIMIZE` after each `INSERT`. Default is `True` | +| `EXPORTER_CLICKHOUSE__BACKOFF_BASE_DELAY` | Base delay for backoff strategy. Default: `0.5` (sec) | +| `EXPORTER_CLICKHOUSE__BACKOFF_EXPO_FACTOR` | Exponential factor for multiply every try. Default: `2.5` (sec) | +| `EXPORTER_CLICKHOUSE__BACKOFF_MAX_TRIES` | Max tries for backoff strategy. Default: `3` | +| `EXPORTER_CLICKHOUSE__BACKOFF_JITTER` | Enable jitter (randomize delay) for retries. Default: `True` | ## State settings -| variable | description | -|----------|-------------| -| `EXPORTER_STATE__STORAGE` | Storage type for StateKeeper. Can be: `jsonfile`, `redis`, `custom`. Default: `jsonfile` | -| `EXPORTER_STATE__REDIS_DSN` | Connection string for Redis state storage when storage type is `redis`. Default is empty. | -| `EXPORTER_STATE__JSONFILE_STRATEGY` | File store strategy for `jsonfile` storage type. Can be `s3` or `local`. Default: `local` | -| `EXPORTER_STATE__JSONFILE_PATH` | Path to JSON state file. Default: `./state.json` | -| `EXPORTER_STATE__JSONFILE_S3_BUCKET` | Bucket for `s3` strategy. Default is empty | -| `EXPORTER_STATE__JSONFILE_S3_REGION` | Region for `s3` strategy. Default is `eu-east-1` | -| `EXPORTER_STATE__JSONFILE_S3_ENDPOINT` | Endpoint URL for `s3` strategy. Default is empty | -| `EXPORTER_STATE__JSONFILE_S3_ACCESS_KEY` | AWS access key id for `s3` strategy. Default is empty | -| `EXPORTER_STATE__JSONFILE_S3_SECRET_KEY` | AWS secret key for `s3` strategy. Default is empty | -| `EXPORTER_STATE__CUSTOM_STORAGE_PARAMS` | Settings for custom storage params as `dict`. Default: `{}` | +| variable | description | +| ---------------------------------------- | ----------------------------------------------------------------------------------------- | +| `EXPORTER_STATE__STORAGE` | Storage type for StateKeeper. Can be: `jsonfile`, `redis`, `custom`. Default: `jsonfile` | +| `EXPORTER_STATE__REDIS_DSN` | Connection string for Redis state storage when storage type is `redis`. Default is empty. | +| `EXPORTER_STATE__JSONFILE_STRATEGY` | File store strategy for `jsonfile` storage type. Can be `s3` or `local`. Default: `local` | +| `EXPORTER_STATE__JSONFILE_PATH` | Path to JSON state file. Default: `./state.json` | +| `EXPORTER_STATE__JSONFILE_S3_BUCKET` | Bucket for `s3` strategy. Default is empty | +| `EXPORTER_STATE__JSONFILE_S3_REGION` | Region for `s3` strategy. Default is `us-east-1` | +| `EXPORTER_STATE__JSONFILE_S3_ENDPOINT` | Endpoint URL for `s3` strategy. Default is empty | +| `EXPORTER_STATE__JSONFILE_S3_ACCESS_KEY` | AWS access key id for `s3` strategy. Default is empty | +| `EXPORTER_STATE__JSONFILE_S3_SECRET_KEY` | AWS secret key for `s3` strategy. Default is empty | +| `EXPORTER_STATE__CUSTOM_STORAGE_PARAMS` | Settings for custom storage params as `dict`. Default: `{}` | ## Observability settings -| variable | description | -|----------|-------------| -| `EXPORTER_MONITORING__METRICS_ENABLED` | Enable send statsd tagged metrics. Default is `False` | -| `EXPORTER_MONITORING__METRICS_HOST` | DogStatsD / statsd host. Default: `localhost` | -| `EXPORTER_MONITORING__METRICS_PORT` | DogStatsD / statsd port. Default: `8125` | -| `EXPORTER_MONITORING__METRICS_BASE_PREFIX` | Prefix for metrics name. Default: `tracker_exporter` | +| variable | description | +| ------------------------------------------ | ---------------------------------------------------------------------------- | +| `EXPORTER_MONITORING__METRICS_ENABLED` | Enable send statsd tagged metrics. Default is `False` | +| `EXPORTER_MONITORING__METRICS_HOST` | DogStatsD / statsd host. Default: `localhost` | +| `EXPORTER_MONITORING__METRICS_PORT` | DogStatsD / statsd port. Default: `8125` | +| `EXPORTER_MONITORING__METRICS_BASE_PREFIX` | Prefix for metrics name. Default: `tracker_exporter` | | `EXPORTER_MONITORING__METRICS_BASE_LABELS` | List of tags for metrics. Example: `["project:internal",]`. Default is empty | -| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is `False` | -| `EXPORTER_MONITORING__SENTRY_DSN` | Sentry DSN. Default is empty | - +| `EXPORTER_MONITORING__SENTRY_ENABLED` | Enable send exception stacktrace to Sentry. Default is `False` | +| `EXPORTER_MONITORING__SENTRY_DSN` | Sentry DSN. Default is empty | # Monitoring Based on DogStatsD tagged format. VictoriaMetrics compatible. -| Metric name | Metric type | Labels | Description | -|-------------|-------------|--------|-------------| -| `tracker_exporter_issue_transform_time_seconds` | time | - | Duration of transform per task (data packing to the model) | -| `tracker_exporter_issues_total_processed_count` | count | - | Total issues processed | -| `tracker_exporter_issues_search_time_seconds` | time | - | Yandex.Tracker search duration time in seconds | -| `tracker_exporter_issues_without_metrics` | count | - | Issues with empty metrics (no changelog) | -| `tracker_exporter_issue_prefetch_seconds` | time | - | Pre-transform data duration in seconds | -| `tracker_exporter_comments_fetch_seconds` | time | - | Comments fetch duration in seconds | -| `tracker_exporter_etl_duration_seconds` | time | - | ETL full pipeline duration in seconds | -| `tracker_exporter_etl_upload_status` | gauge | - | Last upload status, 1 - success, 2 - fail | -| `tracker_exporter_export_and_transform_time_seconds` | time | - | Overall export and transform duration in seconds | -| `tracker_exporter_upload_to_storage_time_seconds` | time | - | Overall insert duration time in seconds | -| `tracker_exporter_last_update_timestamp` | gauge | - | Last data update timestamp | -| `tracker_exporter_clickhouse_insert_time_seconds` | time | database, table | Insert per table duration time in seconds | -| `tracker_exporter_clickhouse_inserted_rows` | count | database, table | Inserted rows per table | -| `tracker_exporter_clickhouse_deduplicate_time_seconds` | time | database, table | Optimize execute time duration in seconds | - +| Metric name | Metric type | Labels | Description | +| ------------------------------------------------------ | ----------- | --------------- | ---------------------------------------------------------- | +| `tracker_exporter_issue_transform_time_seconds` | time | - | Duration of transform per task (data packing to the model) | +| `tracker_exporter_issues_total_processed_count` | count | - | Total issues processed | +| `tracker_exporter_issues_search_time_seconds` | time | - | Yandex.Tracker search duration time in seconds | +| `tracker_exporter_issues_without_metrics` | count | - | Issues with empty metrics (no changelog) | +| `tracker_exporter_issue_prefetch_seconds` | time | - | Pre-transform data duration in seconds | +| `tracker_exporter_comments_fetch_seconds` | time | - | Comments fetch duration in seconds | +| `tracker_exporter_etl_duration_seconds` | time | - | ETL full pipeline duration in seconds | +| `tracker_exporter_etl_upload_status` | gauge | - | Last upload status, 1 - success, 2 - fail | +| `tracker_exporter_export_and_transform_time_seconds` | time | - | Overall export and transform duration in seconds | +| `tracker_exporter_upload_to_storage_time_seconds` | time | - | Overall insert duration time in seconds | +| `tracker_exporter_last_update_timestamp` | gauge | - | Last data update timestamp | +| `tracker_exporter_clickhouse_insert_time_seconds` | time | database, table | Insert per table duration time in seconds | +| `tracker_exporter_clickhouse_inserted_rows` | count | database, table | Inserted rows per table | +| `tracker_exporter_clickhouse_deduplicate_time_seconds` | time | database, table | Optimize execute time duration in seconds | ### Metrics on dashboard demo + ![](/docs/images/etl_metrics.jpeg) diff --git a/examples/extended_model/main.py b/examples/extended_model/main.py index 66940d5..ad92483 100644 --- a/examples/extended_model/main.py +++ b/examples/extended_model/main.py @@ -1,4 +1,3 @@ - from tracker_exporter.models.issue import TrackerIssue from tracker_exporter.utils.helpers import to_snake_case, validate_resource from tracker_exporter import configure_sentry, run_etl @@ -6,7 +5,7 @@ from yandex_tracker_client.collections import Issues -class CustomIssueFields: +class CustomIssueFieldsMixin: """ Additional custom fields for Yandex Tracker issue. Must be created in the Clickhouse issue table. @@ -18,20 +17,16 @@ def __init__(self, issue: Issues) -> None: self.baz = True if "baz" in issue.tags else False -class ExtendedTrackerIssue(TrackerIssue, CustomIssueFields): +class ExtendedTrackerIssue(CustomIssueFieldsMixin, TrackerIssue): """Extended Yandex Tracker issue model with custom fields.""" def __init__(self, issue: Issues) -> None: super().__init__(issue) - CustomIssueFields.__init__(self, issue) def main() -> None: """Entry point.""" - run_etl( - ignore_exceptions=False, - issue_model=ExtendedTrackerIssue - ) + run_etl(ignore_exceptions=False, issue_model=ExtendedTrackerIssue) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index 4f52f28..a7b2a21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ yandex_tracker_client==2.* +boto3==1.34.* +redis==5.0.* datadog==0.47.* APScheduler==3.10.* requests==2.31.* diff --git a/tracker_exporter/__init__.py b/tracker_exporter/__init__.py index d39f829..e769864 100644 --- a/tracker_exporter/__init__.py +++ b/tracker_exporter/__init__.py @@ -1,31 +1,17 @@ from tracker_exporter.main import ( run_etl, configure_sentry, - configure_jsonfile_storage, - configure_state_service, + configure_state_manager, ) from tracker_exporter.etl import YandexTrackerETL from tracker_exporter.services.clickhouse import ClickhouseClient from tracker_exporter.services.tracker import YandexTrackerClient -from tracker_exporter.services.state import ( - StateKeeper, - JsonStateStorage, - RedisStateStorage, - S3FileStorageStrategy, - LocalFileStorageStrategy, -) __all__ = [ "ClickhouseClient", "YandexTrackerClient", "YandexTrackerETL", - "StateKeeper", - "JsonStateStorage", - "RedisStateStorage", - "S3FileStorageStrategy", - "LocalFileStorageStrategy", "run_etl", "configure_sentry", - "configure_jsonfile_storage", - "configure_state_service", + "configure_state_manager", ] diff --git a/tracker_exporter/_meta.py b/tracker_exporter/_meta.py index ef5d0a2..191ee02 100644 --- a/tracker_exporter/_meta.py +++ b/tracker_exporter/_meta.py @@ -1,4 +1,4 @@ -version = "1.0.3" +version = "2.0.0" url = "https://github.com/akimrx/yandex-tracker-exporter" download_url = "https://pypi.org/project/tracker-exporter/" appname = "yandex_tracker_exporter" diff --git a/tracker_exporter/config.py b/tracker_exporter/config.py index 0dc2a22..cb7b1a8 100644 --- a/tracker_exporter/config.py +++ b/tracker_exporter/config.py @@ -2,16 +2,11 @@ import logging from functools import lru_cache -from typing import List +from typing import Literal, Optional, Union from pydantic import validator, root_validator from pydantic_settings import BaseSettings -from tracker_exporter.models.base import ( - YandexTrackerLanguages, - LogLevels, - StateStorageTypes, - JsonStorageStrategies, -) +from tracker_exporter.models.base import YandexTrackerLanguages, LogLevels from tracker_exporter.exceptions import ConfigurationError from tracker_exporter.services.monitoring import DogStatsdClient @@ -24,13 +19,13 @@ class MonitoringSettings(BaseSettings): """Observability settings.""" - metrics_enabled: bool = False - metrics_host: str = "localhost" - metrics_port: int = 8125 - metrics_base_prefix: str = "tracker_exporter" - metrics_base_labels: List[str] = [] - sentry_enabled: bool = False - sentry_dsn: str | None = None + metrics_enabled: Optional[bool] = False + metrics_host: Optional[str] = "localhost" + metrics_port: Optional[int] = 8125 + metrics_base_prefix: Optional[str] = "tracker_exporter" + metrics_base_labels: Optional[list[str]] = [] + sentry_enabled: Optional[bool] = False + sentry_dsn: Optional[str] = None @validator("sentry_dsn", pre=True, always=True) def validate_sentry_dsn(cls, value: str | None, values: dict) -> str: @@ -46,23 +41,23 @@ class Config: class ClickhouseSettings(BaseSettings): """Settings for Clickhouse storage.""" - enable_upload: bool = True - host: str = "localhost" - proto: str = "http" - port: int = 8123 - cacert_path: str | None = None + enable_upload: Optional[bool] = True + host: Optional[str] = "localhost" + proto: Optional[str] = "http" + port: Optional[int] = 8123 + cacert_path: Optional[str] = None serverless_proxy_id: str | None = None - username: str = "default" - password: str | None = None - database: str = "agile" - issues_table: str = "issues" - issue_metrics_table: str = "issue_metrics" - issues_changelog_table: str = "issues_changelog" - auto_deduplicate: bool = True - backoff_base_delay: int | float = 0.5 - backoff_expo_factor: int | float = 2.5 - backoff_max_tries: int = 3 - backoff_jitter: bool = True + username: Optional[str] = "default" + password: Optional[str] = None + database: Optional[str] = "agile" + issues_table: Optional[str] = "issues" + issue_metrics_table: Optional[str] = "issue_metrics" + issues_changelog_table: Optional[str] = "issues_changelog" + auto_deduplicate: Optional[bool] = True + backoff_base_delay: Optional[Union[int, float]] = 0.5 + backoff_expo_factor: Optional[Union[int, float]] = 2.5 + backoff_max_tries: Optional[int] = 3 + backoff_jitter: Optional[bool] = True @validator("serverless_proxy_id", pre=True, always=True) def validate_serverless_proxy_id(cls, value: str | None, values: dict) -> str: @@ -85,10 +80,10 @@ class Config: class IssuesSearchSettings(BaseSettings): """Settings for search & export.""" - query: str | None = None - range: str = "2h" - queues: str | List[str] | None = None - per_page_limit: int = 100 + query: Optional[str] = None + range: Optional[str] = "2h" + queues: Optional[Union[str, list[str]]] = None + per_page_limit: Optional[int] = 100 @validator("queues", pre=True, always=True) def validate_queues(cls, value: str) -> list: @@ -108,15 +103,15 @@ class Config: class TrackerSettings(BaseSettings): """Settings for Yandex.Tracker client.""" - loglevel: LogLevels = LogLevels.warning - token: str | None = None - org_id: str | None = None - iam_token: str | None = None - cloud_org_id: str | None = None - timeout: int = 10 - max_retries: int = 10 - language: YandexTrackerLanguages = YandexTrackerLanguages.en - timezone: str = "Europe/Moscow" + loglevel: Optional[LogLevels] = LogLevels.warning + token: Optional[str] = None + org_id: Optional[str] = None + iam_token: Optional[str] = None + cloud_org_id: Optional[str] = None + timeout: Optional[int] = 10 + max_retries: Optional[int] = 10 + language: Optional[YandexTrackerLanguages] = YandexTrackerLanguages.en + timezone: Optional[str] = "Europe/Moscow" search: IssuesSearchSettings = IssuesSearchSettings() @root_validator(pre=True) @@ -145,16 +140,16 @@ class Config: class StateSettings(BaseSettings): """Settings for stateful mode.""" - storage: StateStorageTypes | None = StateStorageTypes.jsonfile - redis_dsn: str = "redis://localhost:6379" - jsonfile_strategy: JsonStorageStrategies = JsonStorageStrategies.local - jsonfile_path: str = "./state.json" - jsonfile_s3_bucket: str | None = None - jsonfile_s3_region: str = "eu-east-1" - jsonfile_s3_endpoint: str | None = None - jsonfile_s3_access_key: str | None = None - jsonfile_s3_secret_key: str | None = None - custom_storage_params: dict = {} + storage: Optional[Literal["redis", "jsonfile", "custom"]] = "jsonfile" + redis_dsn: Optional[str] = "redis://localhost:6379" + jsonfile_strategy: Optional[Literal["s3", "local"]] = "local" + jsonfile_path: Optional[str] = "state.json" + jsonfile_s3_bucket: Optional[str] = None + jsonfile_s3_region: Optional[str] = "us-east-1" + jsonfile_s3_endpoint: Optional[str] = None + jsonfile_s3_access_key: Optional[str] = None + jsonfile_s3_secret_key: Optional[str] = None + custom_storage_params: Optional[dict] = {} @root_validator(pre=True) def validate_state(cls, values) -> str: @@ -172,7 +167,7 @@ def validate_state(cls, values) -> str: ) ) - if jsonfile_strategy == JsonStorageStrategies.s3 and not s3_is_configured: + if jsonfile_strategy == "s3" and not s3_is_configured: raise ConfigurationError("S3 must be configured for JSONFileStorage with S3 strategy.") return values @@ -188,23 +183,23 @@ class Settings(BaseSettings): clickhouse: ClickhouseSettings = ClickhouseSettings() tracker: TrackerSettings = TrackerSettings # TODO (akimrx): research, called class not see TOKEN's state: StateSettings = StateSettings() - stateful: bool = False - stateful_initial_range: str = "1w" - changelog_export_enabled: bool = False - log_etl_stats: bool = True - log_etl_stats_each_n_iter: int = 100 - - loglevel: LogLevels = LogLevels.info - workdays: List[int] = [0, 1, 2, 3, 4] - business_hours_start: datetime.time = datetime.time(9) - business_hours_end: datetime.time = datetime.time(22) - datetime_response_format: str = "%Y-%m-%dT%H:%M:%S.%f%z" - datetime_query_format: str = "%Y-%m-%d %H:%M:%S" - datetime_clickhouse_format: str = "%Y-%m-%dT%H:%M:%S.%f" - - etl_interval_minutes: int = 30 - closed_issue_statuses: str | list = "closed,rejected,resolved,cancelled,released" - not_nullable_fields: tuple | list | str = ( + stateful: Optional[bool] = False + stateful_initial_range: Optional[str] = "1w" + changelog_export_enabled: Optional[bool] = False + log_etl_stats: Optional[bool] = True + log_etl_stats_each_n_iter: Optional[int] = 100 + + loglevel: Optional[LogLevels] = LogLevels.info + workdays: Optional[list[int]] = [0, 1, 2, 3, 4] + business_hours_start: Optional[datetime.time] = datetime.time(9) + business_hours_end: Optional[datetime.time] = datetime.time(22) + datetime_response_format: Optional[str] = "%Y-%m-%dT%H:%M:%S.%f%z" + datetime_query_format: Optional[str] = "%Y-%m-%d %H:%M:%S" + datetime_clickhouse_format: Optional[str] = "%Y-%m-%dT%H:%M:%S.%f" + + etl_interval_minutes: Optional[int] = 30 + closed_issue_statuses: Optional[Union[str, list]] = "closed,rejected,resolved,cancelled,released" + not_nullable_fields: Optional[Union[tuple, list, str]] = ( "created_at", "resolved_at", "closed_at", diff --git a/tracker_exporter/etl.py b/tracker_exporter/etl.py index 231e3ce..59f5779 100644 --- a/tracker_exporter/etl.py +++ b/tracker_exporter/etl.py @@ -1,7 +1,7 @@ import time import logging from datetime import datetime, timedelta -from typing import Tuple, List +from typing import Tuple, List, Optional from yandex_tracker_client.collections import Issues from yandex_tracker_client.objects import SeekablePaginatedList from yandex_tracker_client.exceptions import Forbidden @@ -9,7 +9,7 @@ from tracker_exporter.config import config, monitoring from tracker_exporter.models.issue import TrackerIssue from tracker_exporter.models.base import ClickhousePayload -from tracker_exporter.services.state import StateKeeper +from tracker_exporter.state.managers import AbstractStateManager from tracker_exporter.services.tracker import YandexTrackerClient from tracker_exporter.services.clickhouse import ClickhouseClient from tracker_exporter.exceptions import ConfigurationError, UploadError, ExportOrTransformError @@ -31,7 +31,7 @@ def __init__( *, tracker_client: YandexTrackerClient, clickhouse_client: ClickhouseClient, - statekeeper: StateKeeper | None = None, + state_manager: Optional[AbstractStateManager] = None, issue_model: TrackerIssue = TrackerIssue, database: str = config.clickhouse.database, issues_table: str = config.clickhouse.issues_table, @@ -42,7 +42,7 @@ def __init__( ) -> None: self.tracker = tracker_client self.clickhouse = clickhouse_client - self.state = statekeeper + self.state = state_manager self.issue_model = issue_model self.database = database self.issues_table = issues_table @@ -142,7 +142,7 @@ def _export_and_transform( found_issues = self.tracker.search_issues(query=query, filter=filter, order=order, limit=limit) if len(found_issues) == 0: logger.info("Nothing to export. Skipping ETL") - return + return issues, changelog_events, metrics, possible_new_state if isinstance(found_issues, SeekablePaginatedList): pagination = True @@ -216,7 +216,7 @@ def run( try: issues, changelogs, metrics, possible_new_state = self._export_and_transform(**query, limit=limit) if stateful and possible_new_state is not None: - logger.info(f"Possible new state: {possible_new_state}") + logger.info(f"Stateful mode enabled, fetching possible new state: {possible_new_state}") last_saved_state = self.state.get(self.state_key) if last_saved_state == possible_new_state and len(issues) <= 1 and len(metrics) <= 1: logger.info("Data already is up-to-date, skipping upload stage") @@ -247,6 +247,7 @@ def run( raise UploadError(str(exc)) else: if all((stateful, self.state, possible_new_state)): + logger.info(f"Saving last ETL timestamp {possible_new_state}") self.state.set(self.state_key, possible_new_state) else: logger.info( diff --git a/tracker_exporter/exceptions.py b/tracker_exporter/exceptions.py index e77b59f..1b32f6f 100644 --- a/tracker_exporter/exceptions.py +++ b/tracker_exporter/exceptions.py @@ -28,3 +28,7 @@ class JsonFileNotFound(Exception): class InvalidJsonFormat(Exception): pass + + +class SerializerError(Exception): + pass diff --git a/tracker_exporter/main.py b/tracker_exporter/main.py index ef48948..535b5ca 100644 --- a/tracker_exporter/main.py +++ b/tracker_exporter/main.py @@ -23,6 +23,7 @@ required=False, help="Path to .env file", ) +parser.add_argument("--run-once", dest="run_once", action="store_true", help="Run ETL once.") args, _ = parser.parse_known_args() warnings.filterwarnings("ignore") @@ -33,8 +34,8 @@ # pylint: disable=C0413 from tracker_exporter.services.monitoring import sentry_events_filter -from tracker_exporter.services.state import StateKeeper, LocalFileStorageStrategy, JsonStateStorage -from tracker_exporter.models.base import StateStorageTypes, JsonStorageStrategies +from tracker_exporter.state.managers import AbstractStateManager +from tracker_exporter.state.factory import StateManagerFactory, IObjectStorageProps from tracker_exporter.models.issue import TrackerIssue from tracker_exporter.etl import YandexTrackerETL from tracker_exporter.services.tracker import YandexTrackerClient @@ -79,31 +80,29 @@ def configure_sentry() -> None: logger.info(f"Sentry send traces is {'enabled' if config.monitoring.sentry_enabled else 'disabled'}") -def configure_jsonfile_storage() -> JsonStateStorage: - """Configure and returns storage for StateKeeper.""" - match config.state.jsonfile_strategy: - case JsonStorageStrategies.local: - storage_strategy = LocalFileStorageStrategy(config.state.jsonfile_path) - case JsonStorageStrategies.s3: - raise NotImplementedError - case _: - raise ValueError - return JsonStateStorage(storage_strategy) - - -def configure_state_service() -> StateKeeper | None: +def configure_state_manager() -> AbstractStateManager | None: """Configure StateKeeper for ETL stateful mode.""" if not config.stateful: return match config.state.storage: - case StateStorageTypes.jsonfile: - storage = configure_jsonfile_storage() - case StateStorageTypes.redis: + case "jsonfile": + s3_props: IObjectStorageProps = IObjectStorageProps( + bucket_name=config.state.jsonfile_s3_bucket, + access_key_id=config.state.jsonfile_s3_access_key, + secret_key=config.state.jsonfile_s3_secret_key, + endpoint_url=config.state.jsonfile_s3_endpoint, + region=config.state.jsonfile_s3_region, + ) + return StateManagerFactory.create_file_state_manager( + strategy=config.state.jsonfile_strategy, filename=config.state.jsonfile_path, **s3_props + ) + case "redis": + return StateManagerFactory.create_redis_state_manager(config.state.redis_dsn) + case "custom": raise NotImplementedError case _: raise ValueError - return StateKeeper(storage) def run_etl(ignore_exceptions: bool = False, issue_model: TrackerIssue = TrackerIssue) -> None: @@ -111,7 +110,7 @@ def run_etl(ignore_exceptions: bool = False, issue_model: TrackerIssue = Tracker etl = YandexTrackerETL( tracker_client=YandexTrackerClient(), clickhouse_client=ClickhouseClient(), - statekeeper=configure_state_service(), + state_manager=configure_state_manager(), issue_model=issue_model, ) etl.run( @@ -128,6 +127,12 @@ def run_etl(ignore_exceptions: bool = False, issue_model: TrackerIssue = Tracker def main() -> None: """Entry point for CLI command.""" configure_sentry() + + if args.run_once: + logger.info("A one-time launch command is received, the scheduler setting will be skipped") + run_etl() + sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) scheduler.start() diff --git a/tracker_exporter/models/base.py b/tracker_exporter/models/base.py index 4bc5f2a..1b4b260 100644 --- a/tracker_exporter/models/base.py +++ b/tracker_exporter/models/base.py @@ -1,5 +1,5 @@ import json -from abc import ABCMeta, ABC, abstractmethod +from abc import ABCMeta from enum import Enum from typing import Any @@ -12,17 +12,6 @@ class ClickhousePayload(BaseModel): metrics: list -class StateStorageTypes(str, Enum): - redis = "redis" - jsonfile = "jsonfile" - custom = "custom" - - -class JsonStorageStrategies(str, Enum): - local = "local" - s3 = "s3" - - class LogLevels(str, Enum): debug = "debug" info = "info" @@ -100,41 +89,3 @@ def parse(val): data = self.__dict__.copy() return parse(data) - - -class BaseStateStorage(ABC): - """Abstract class for state storage. - Allows you to save, receive, delete and flush the state. - - """ - - @abstractmethod - def set(self, key: str, value: Any) -> None: - """Save key:value pair to storage.""" - - @abstractmethod - def get(self, key: str) -> Any: - """Get value by key from storage.""" - - @abstractmethod - def delete(self, key: str) -> None: - """Delete value by key from storage.""" - - @abstractmethod - def flush(self) -> None: - """Flush (drop) state from storage.""" - - -class JSONFileStorageStrategy(ABC): - """Abstract strategy for store content via file.""" - - def __init__(self, file_path: str) -> None: - self.file_path = file_path - - @abstractmethod - def read(self) -> Any: - """Read content from file.""" - - @abstractmethod - def save(self) -> Any: - """Save content to file.""" diff --git a/tracker_exporter/services/__init__.py b/tracker_exporter/services/__init__.py index 8202b65..6522e79 100644 --- a/tracker_exporter/services/__init__.py +++ b/tracker_exporter/services/__init__.py @@ -1,21 +1,9 @@ from tracker_exporter.services.clickhouse import ClickhouseClient from tracker_exporter.services.monitoring import DogStatsdClient from tracker_exporter.services.tracker import YandexTrackerClient -from tracker_exporter.services.state import ( - S3FileStorageStrategy, - LocalFileStorageStrategy, - JsonStateStorage, - RedisStateStorage, - StateKeeper, -) __all__ = [ "ClickhouseClient", "DogStatsdClient", "YandexTrackerClient", - "S3FileStorageStrategy", - "LocalFileStorageStrategy", - "JsonStateStorage", - "RedisStateStorage", - "StateKeeper", ] diff --git a/tracker_exporter/services/state.py b/tracker_exporter/services/state.py deleted file mode 100644 index c3caba7..0000000 --- a/tracker_exporter/services/state.py +++ /dev/null @@ -1,121 +0,0 @@ -import os -import json -import logging - -from typing import Any, Dict -from tracker_exporter.models.base import BaseStateStorage, JSONFileStorageStrategy -from tracker_exporter.exceptions import JsonFileNotFound, InvalidJsonFormat - -logger = logging.getLogger(__name__) - - -class S3FileStorageStrategy(JSONFileStorageStrategy): - """Strategy for storing a JSON file in the remote object storage (S3).""" - - def __init__(self, file_path: str): - self.file_path = file_path - raise NotImplementedError # TODO (akimrx): implement - - -class LocalFileStorageStrategy(JSONFileStorageStrategy): - """Strategy for storing a JSON file in the local file system.""" - - def __init__(self, file_path: str, raise_if_not_exists: bool = False): - self.file_path = file_path - self.raise_if_not_exists = raise_if_not_exists - - if not file_path.endswith(".json"): - self.file_path = f"{file_path}.json" - else: - self.file_path = file_path - - def save(self, content: Any) -> None: - """Save content to JSON file.""" - with open(self.file_path, "w", encoding="utf-8") as json_file: - logger.debug(f"Dumping state to file {self.file_path} ...") - json.dump(content, json_file, ensure_ascii=False, indent=2) - logger.info(f"State successfuly saved to file: {self.file_path}") - - def read(self) -> Dict[str, Any]: - """Read content from JSON file.""" - logger.info(f"Trying reading state from file {self.file_path}") - if not all((os.path.isfile(self.file_path), os.path.exists(self.file_path))): - if self.raise_if_not_exists: - raise JsonFileNotFound("JSON file %s not found", self.file_path) - logger.warning(f"State file with name '{self.file_path}' not found") - return {} - - with open(self.file_path, "r", encoding="utf-8") as json_file: - try: - logger.debug(f"Trying opening file: {self.file_path}") - content = json.load(json_file) - except json.JSONDecodeError as exc: - logger.exception(f"Invalid state file format: {exc}") - raise InvalidJsonFormat(self.file_path) - - if not content or content is None: - return {} - return content - - -class JsonStateStorage(BaseStateStorage): - """File storage backend based on JSON.""" - - def __init__(self, strategy: JSONFileStorageStrategy) -> None: - self.file_storage = strategy - self.state = {} - - def get(self, key: str) -> Any: - """Get state by key.""" - self.state = self.file_storage.read() - return self.state.get(key) - - def set(self, key: str, value: str) -> None: - """Set state as key=value.""" - self.state = self.file_storage.read() - self.state[key] = value - self.file_storage.save(self.state) - - def delete(self, key: str) -> None: - """Delete state by key.""" - self.state = self.file_storage.read() - if self.state.get(key) is not None: - del self.state[key] - self.file_storage.save(self.state) - - def flush(self): - """Drop all states.""" - self.state = {} - self.file_storage.save(self.state) - - -class RedisStateStorage(BaseStateStorage): - """Redis storage backend. Supports retries with exponential backoff.""" - - def __init__(self, host: str, port: str): - self.host = host - self.port = port - raise NotImplementedError # TODO (akimrx): implement - - -class StateKeeper: - """Class for operations with state.""" - - def __init__(self, storage: BaseStateStorage) -> None: - self.storage = storage - - def get(self, key: str) -> Any: - """Get state by key.""" - return self.storage.get(key) - - def set(self, key: str, value: Any) -> None: - """Set state by key:value pair.""" - return self.storage.set(key, value) - - def delete(self, key: str) -> None: - """Delete state by key.""" - return self.storage.delete(key) - - def flush(self) -> None: - """Flush all keys in the state storage.""" - return self.storage.flush() diff --git a/tracker_exporter/state/__init__.py b/tracker_exporter/state/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tracker_exporter/state/backends.py b/tracker_exporter/state/backends.py new file mode 100644 index 0000000..4e57496 --- /dev/null +++ b/tracker_exporter/state/backends.py @@ -0,0 +1,256 @@ +import os +import logging + +from abc import ABC, abstractmethod +from typing import Any, ContextManager + +import boto3 + +from tracker_exporter.state.serializers import AbstractSerializer, JsonSerializer + +logger = logging.getLogger(__name__) + + +class AbstractFileStorageBackend(ABC): + """ + An abstract base class for file storage systems, enforcing a common interface for file operations. + + :param serializer: The serializer instance used for serializing and deserializing data. + :param raise_if_not_exists: Raise :exc:`FileNotFound` if file not exists. Defaults to True. + :param auto_sub_ext_by_serializer: Automatically substitute the file extension based on the serializer. Defaults is ``False``. + + """ + + def __init__( + self, + serializer: AbstractSerializer, + raise_if_not_exists: bool = True, + auto_sub_ext_by_serializer: bool = False, + ) -> None: + self.serializer = serializer if hasattr(serializer, "is_initialized") else serializer() + self.raise_if_not_exists = raise_if_not_exists + self.auto_sub_ext_by_serializer = auto_sub_ext_by_serializer + + def path_with_ext(self, path: str) -> str: + """Appends the file extension from the serializer if not present in the path.""" + if not path.endswith(f".{self.serializer.ext}"): + return f"{path}.{self.serializer.ext}" + return path + + @abstractmethod + def read(self, path: str, deserialize: bool = False) -> Any: + """Abstract method for reading data from a given file path.""" + + @abstractmethod + def write(self, path: str, data: Any) -> None: + """Abstract method for writing data to a given file path.""" + + +class AbstractKeyValueStorageBackend(ABC): + """An abstract base class for key value storage backends like Redis, Consul, etc.""" + + @abstractmethod + def client(self, *args, **kwargs) -> ContextManager: + """An abstract method that returns client context manager.""" + + @abstractmethod + def get(self, key: str | list, *args, **kwargs) -> Any: + """An abstract method for get value(s) by key from storage.""" + + @abstractmethod + def set(self, key: str, value: Any, *args, **kwargs) -> None: + """An abstract method for save key:value pair to storage.""" + + @abstractmethod + def delete(self, key: str | list, *args, **kwargs) -> None: + """An abstract method for deletes key(s) from storage.""" + + +class LocalFileStorageBackend(AbstractFileStorageBackend): + """ + A concrete synchronous implementation of AbstractFileStorage for local file storage operations. + Overrides the read and write asynchronous methods for file operations using the aiofiles package. + + :param serializer: The serializer instance used for serializing and deserializing data. + :param raise_if_not_exists: Raise :exc:`FileNotFound` if file not exists. Defaults to True. + :param auto_sub_ext_by_serializer: Automatically substitute the file extension based on the serializer. Defaults is ``False``. + + Default serializer: :class:`JsonSerializer` + + Usage:: + + storage = LocalFileStorage() + + storage.write("myfile.json", data={"foo": "bar"}) + r = storage.read("myfile.json", deserialize=True) + + print(r) # {"foo": "bar"} + + """ + + def __init__( + self, + serializer: AbstractSerializer | None = None, + raise_if_not_exists: bool = True, + auto_sub_ext_by_serializer: bool = False, + ) -> None: + super().__init__( + serializer or JsonSerializer, + raise_if_not_exists=raise_if_not_exists, + auto_sub_ext_by_serializer=auto_sub_ext_by_serializer, + ) + + def read(self, path: str, deserialize: bool = False) -> Any: + """ + Reads data from a local file, deserializes it using the provided serializer, + and returns the deserialized data. + + :param path: A local file path for read content from. + :param deserialize: Deserialize readed file content via serializer. + + """ + if self.auto_sub_ext_by_serializer: + path = self.path_with_ext(path) + + if not os.path.isfile(path) and not os.path.exists(path): + if self.raise_if_not_exists: + raise FileNotFoundError(f"File with name {path} not found") + logger.debug(f"File with name '{path}' not found") + return {} + + with open(path, "r") as file: + data = file.read() + + if deserialize: + return self.serializer.deserialize(data) + return data + + def write(self, path: str, data: Any) -> None: + """ + Serializes the given data using the provided serializer and writes it to a local file. + + :param path: An local path for write content to. + :param data: Content that will be written to file. + + """ + + if self.auto_sub_ext_by_serializer: + path = self.path_with_ext(path) + + with open(path, "w") as file: + file.write(self.serializer.serialize(data)) + + +class S3FileStorageBackend(AbstractFileStorageBackend): + """ + A concrete synchronous implementation of AbstractFileStorage for S3 object storage operations. + Initializes an aioboto3 session and provides read and write operations for files stored in an S3 bucket. + + Default serializer: :class:`JsonSerializer` + + :param bucket_name: The name of the S3 bucket. + :param access_key_id: Service account ID, if empty using ``AWS_ACCESS_KEY_ID`` environment variable. + :param secret_key: Secret key for service account, if empty using ``AWS_SECRET_ACCESS_KEY`` environment variable. + :param endpoint_url: S3 endpoint for use with Yandex.Cloud, Minio and other providers. + :param region: S3 region. Default: ``us-east1`` + :param serializer: The serializer instance used for serializing and deserializing data. + :param raise_if_not_exists: Raise FileNotFound if file not exists. Defaults to ``True``. + :param auto_sub_ext_by_serializer: Automatically substitute the file extension based on the serializer. Defaults is ``False``. + + Usage:: + + storage = S3FileStorage( + bucket_name="my-bucket", + access_key_id="XXXX", + secret_key="XXXX", + endpoint_url="https://storage.yandexcloud.net", + region="ru-central1" + ) + + storage.write("myfile.json", data={"foo": "bar"}) + r = storage.read("myfile.json", deserialize=True) + + print(r) # {"foo": "bar"} + + """ + + def __init__( + self, + bucket_name: str, + serializer: AbstractSerializer | None = None, + raise_if_not_exists: bool = True, + auto_sub_ext_by_serializer: bool = False, + access_key_id: str | None = None, + secret_key: str | None = None, + region: str | None = None, + endpoint_url: str | None = None, + **kwargs, + ) -> None: + super().__init__( + serializer or JsonSerializer, + raise_if_not_exists=raise_if_not_exists, + auto_sub_ext_by_serializer=auto_sub_ext_by_serializer, + ) + self.bucket_name = bucket_name + self.endpoint_url = endpoint_url + self.session = boto3.Session( + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_key, + region_name=region or "us-east1", + **kwargs, + ) + + @property + def client(self): + """Returns a resource client for S3 operations.""" + return self.session.client("s3", endpoint_url=self.endpoint_url) + + def read(self, path: str, deserialize: bool = False) -> Any: + """ + Reads data from an S3 object, deserializes it using the provided serializer, + and returns the deserialized data. + + :param path: A local file path for read content from. + :param deserialize: Deserialize readed file content via serializer. + + """ + if self.auto_sub_ext_by_serializer: + path = self.path_with_ext(path) + + try: + response = self.client.get_object(Bucket=self.bucket_name, Key=path) + except Exception as exc: + error_msg = f"Exception while reading file '{path}'. Possible file not exists. Error: {exc}" + + if self.raise_if_not_exists: + raise FileNotFoundError(error_msg) from exc + + logger.debug(error_msg) + return {} + + with response["Body"] as stream: + data = stream.read() + + if deserialize: + return self.serializer.deserialize(data.decode()) + return data.decode() + + def write(self, path: str, data: Any) -> None: + """ + Serializes the given data using the provided serializer and writes it to an S3 object. + + :param path: An local path for write content to. + :param data: Content that will be written to file. + + """ + if self.auto_sub_ext_by_serializer: + path = self.path_with_ext(path) + + self.client.put_object(Bucket=self.bucket_name, Key=path, Body=self.serializer.serialize(data).encode()) + + +__all__ = [ + "AbstractFileStorageBackend", + "LocalFileStorageBackend", + "S3FileStorageBackend", +] diff --git a/tracker_exporter/state/factory.py b/tracker_exporter/state/factory.py new file mode 100644 index 0000000..6e93bb1 --- /dev/null +++ b/tracker_exporter/state/factory.py @@ -0,0 +1,50 @@ +from typing import Literal, Type, TypedDict, Optional + +from redis import Redis + +from tracker_exporter.state.serializers import AbstractSerializer, JsonSerializer +from tracker_exporter.state.backends import S3FileStorageBackend, LocalFileStorageBackend +from tracker_exporter.state.managers import FileStateManager, RedisStateManager + + +class IObjectStorageProps(TypedDict): + bucket_name: str + access_key_id: str + secret_key: str + region: Optional[str] + endpoint_url: Optional[str] + + +class StateManagerFactory: + """Factory for easy way to create StateManager.""" + + @staticmethod + def create_file_state_manager( + strategy: Literal["local", "s3"], + filename: str = "state.json", + serializer: Type[AbstractSerializer] = JsonSerializer, + **s3_props: Optional[IObjectStorageProps], + ) -> FileStateManager: + match strategy: + case "local": + backend = LocalFileStorageBackend(serializer=serializer, raise_if_not_exists=False) + case "s3": + bucket_name = s3_props["bucket_name"] + del s3_props["bucket_name"] + + backend = S3FileStorageBackend( + bucket_name, serializer=serializer, raise_if_not_exists=False, **s3_props + ) + case _: + raise ValueError("Invalid jsonfile strategy, allowed: s3, local") + + return FileStateManager(backend, state_file_name=filename) + + @staticmethod + def create_redis_state_manager( + url: str, + namespace: str = "tracker_exporter_default", + serializer: Type[AbstractSerializer] = JsonSerializer, + ) -> RedisStateManager: + backend = Redis.from_url(url, decode_responses=True) + return RedisStateManager(backend, namespace=namespace, serializer=serializer) diff --git a/tracker_exporter/state/managers.py b/tracker_exporter/state/managers.py new file mode 100644 index 0000000..5fba8ff --- /dev/null +++ b/tracker_exporter/state/managers.py @@ -0,0 +1,223 @@ +from abc import ABC, abstractmethod +from contextlib import suppress +from typing import Any, Type + +from tracker_exporter.state.backends import AbstractFileStorageBackend, AbstractKeyValueStorageBackend +from tracker_exporter.state.serializers import AbstractSerializer, JsonSerializer +from tracker_exporter.exceptions import SerializerError + + +class AbstractStateManager(ABC): + """ + Abstract class for state storage. + + Allows user to async save, receive, delete and flush the state. + """ + + @abstractmethod + async def set(self, key: str, value: Any) -> None: + """Abstract method for save key:value pair to storage.""" + + @abstractmethod + async def get(self, key: str, default: Any = None) -> Any: + """Abstract method for get value by key from storage.""" + + @abstractmethod + async def delete(self, key: str) -> None: + """Abstract method for delete value by key from storage.""" + + @abstractmethod + async def flush(self) -> None: + """Abstract method for flush (drop) state from storage.""" + + +class FileStateManager(AbstractStateManager): + """ + A state manager for handling state persistence in file storage (local, s3 or other). + + This class provides an abstraction for managing application state data stored within a file. + It supports basic CRUD operations such as setting, getting, and deleting state information, + utilizing an abstract file storage mechanism. + + :param storage: The file storage provider for persisting state data. + :param state_file_name: The name of the file where state data is stored. Defaults to ``state``. + + Usage:: + + from datetime import datetime + + storage_backend = LocalFileStorage() # also, you can use S3FileStorage + state = FileStateManager(storage_backend, state_file_name="my_state") + + + def my_function() -> None: + ... + last_state = state.get("my_function", default={}) + + if last_state.get("last_run") is None: + new_state = {"last_run": datetime.now().strftime("%Y-%M-%d %H:%M:%S")} + state.set("myfunction", new_state) + + ... + + .. note:: + The state data is managed as a dictionary (JSON-compatible), allowing for key-value pair manipulation. + Other data formats is NOT SUPPORTED. + + """ + + def __init__(self, storage: AbstractFileStorageBackend, state_file_name: str = "state") -> None: + self.storage = storage + self.state_file_name = state_file_name + self.state = {} + + self.storage.auto_sub_ext_by_serializer = True + self.storage.raise_if_not_exists = False + + def get(self, key: str, default: Any = None) -> Any: + """ + Get state value by key. + + :param key: State key. + :param default: Default value if specified key not found. + + """ + self.state = self.storage.read(self.state_file_name, deserialize=True) + return self.state.get(key, default) + + def set(self, key: str, value: str) -> None: + """ + Set state an value for the key. + + :param key: State key. + :param value: Value to be saved assotiated with key. + + """ + self.state = self.storage.read(self.state_file_name, deserialize=True) + self.state[key] = value + self.storage.write(self.state_file_name, self.state) + + def delete(self, key: str) -> None: + """ + + Deletes state (value) by key. + + :param key: State key to be deleted. + """ + self.state = self.storage.read(self.state_file_name, deserialize=True) + if self.state.get(key) is not None: + del self.state[key] + self.storage.write(self.state_file_name, self.state) + + def flush(self): + """Drop all data from state.""" + self.state = {} + self.storage.write(self.state_file_name, self.state) + + +class RedisStateManager(AbstractStateManager): + """ + A state manager for handling state persistence in the Redis storage. + + This class provides an abstraction layer over a Redis storage mechanism, allowing + for easy setting, getting, and deletion of state information with optional serialization + support. It uses an underlying key-value storage provider and supports namespacing to + segregate different state data. + + It is recommended to use a JSON-compatible state format, such as a dict, to maintain portability + between other state managers. + + :param storage: The storage provider for persisting state data. + :param serializer: An optional serializer for converting + data to and from the storage format. Defaults to JsonSerializer if not provided. + :param namespace: A namespace prefix for all keys managed by this instance. + Helps in avoiding key collisions. Defaults to ``tracker_exporter_default``. + + Usage:: + + from datetime import datetime + from redis import Redis + + redis = Redis.from_url("redis://localhost:6379", decode_responses=True) + state = RedisStateManager(redis, namespace="my_namespace") + + + def my_function() -> None: + ... + last_state = state.get("my_function", default={}) + + if last_state.get("last_run") is None: + new_state = {"last_run": datetime.now().strftime("%Y-%M-%d %H:%M:%S")} + state.set("myfunction", new_state) + + ... + + """ + + def __init__( + self, + storage: AbstractKeyValueStorageBackend, + serializer: Type[AbstractSerializer] | None = None, + namespace: str = "tracker_exporter_default", + ) -> None: + self.storage = storage + self.serializer = serializer() or JsonSerializer() + self.namespace = namespace + + def _rkey(self, key: str) -> str: + """Resolve full key path with namespace.""" + return f"{self.namespace}:{key}" + + def set(self, key: str, value: Any) -> None: + """ + Set an value for the state key. + + :param key: State key. + :param value: Value to be saved assotiated with key. + + """ + if isinstance(value, dict): + value = self.serializer.serialize(value) + + with self.storage.client() as session: + session.set(self._rkey(key), value) + + def get(self, key: str) -> Any: + """ + Get state value by key from Redis. + + :param key: Key state. + :param default: Default value if specified key not found. + + """ + with self.storage.client() as session: + value = session.get(self._rkey(key)) + + with suppress(SerializerError): + value = self.serializer.deserialize(value) + return value + + def delete(self, key: str) -> None: + """ + Deletes state (value) by key if exists. + + :param key: State key to be deleted. + """ + with self.storage.client() as session: + session.delete(self._rkey(key)) + + def flush(self) -> None: + """Flush all data in the namespace.""" + raise NotImplementedError + + def execute(self, cmd: str, *args, **kwargs) -> Any: + """ + Common method for execute any Redis supported command. + + :param cmd: Redis command to execute. + """ + with self.storage.client() as session: + return session.execute_command(cmd, *args, **kwargs) + + +__all__ = ["AbstractStateManager", "FileStateManager", "RedisStateManager"] diff --git a/tracker_exporter/state/serializers.py b/tracker_exporter/state/serializers.py new file mode 100644 index 0000000..45c6d5c --- /dev/null +++ b/tracker_exporter/state/serializers.py @@ -0,0 +1,82 @@ +"""This module contains content serializers.""" + +import json +import yaml + +from abc import ABC, abstractmethod +from typing import Any + +from tracker_exporter.exceptions import SerializerError + + +class AbstractSerializer(ABC): + """ + An abstract serializer like JSON, YAML, etc. + + All (de)serialize errors must be raise `SerializerError`. + """ + + def __init__(self) -> None: + self.is_initialized = True + + @property + @abstractmethod + def ext(self) -> str: + """Abstract property for returns serializer file extension.""" + + @abstractmethod + def serialize(self, data: Any, *args, **kwargs) -> str: + """Abstract method for serialize data.""" + + @abstractmethod + def deserialize(self, data: str, **kwargs) -> Any: + """Abstract method for deserialize data.""" + + +class JsonSerializer(AbstractSerializer): + """ + Serializer for converting between JSON and Python objects. + + This serializer handles serialization (Python object to JSON format) + and deserialization (JSON format to Python object) processes, + ensuring that data is correctly transformed for JSON storage or + retrieval while maintaining the Python object's structure. + + :raises SerializerError: If an error occurs during the JSON (de)serialization process. + """ + + @property + def ext(self) -> str: + return "json" + + def serialize(self, data: Any, ensure_ascii: bool = False, indent: int = 2, **kwargs) -> str: + """ + Serialize data to JSON format (str). + + :param data: Data that will be serialized to JSON. + :param ensure_ascii: If ``False``, then the return value can contain non-ASCII characters if they appear in strings contained in obj. + Otherwise, all such characters are escaped in JSON strings. + :param indent: Spaces indent. Defaults: ``2``. + + :raises SerializerError: If an error occurs during the JSON serialization process. + """ + try: + return json.dumps(data, ensure_ascii=ensure_ascii, indent=indent, **kwargs) + except (json.JSONDecodeError, TypeError) as exc: + raise SerializerError(exc) from exc + + def deserialize(self, data: str, **kwargs) -> Any: + """ + Derialize JSON data to Python object format. + + :param data: Data that will be deserialized from JSON. + + :raises SerializerError: If an error occurs during the JSON deserialization process. + """ + try: + return json.loads(data, **kwargs) + except (json.JSONDecodeError, TypeError) as exc: + raise SerializerError(exc) from exc + + +__all__ = ["AbstractSerializer", "JsonSerializer"]