Skip to content

Commit

Permalink
feat(variant): add new endpoint to clear snapshots (#2135)
Browse files Browse the repository at this point in the history
Adding the new endpoint "clear-snapshots" will allow admin users to clear
old snapshots in order to easily gain disk usage.

---------

Signed-off-by: Sylvain Leclerc <[email protected]>
Co-authored-by: Mohamed Abdel Wedoud <[email protected]>
Co-authored-by: Laurent LAPORTE <[email protected]>
Co-authored-by: belthlemar <[email protected]>
Co-authored-by: Sylvain Leclerc <[email protected]>
Co-authored-by: Hatim Dinia <[email protected]>
Co-authored-by: Samir Kamal <[email protected]>
Co-authored-by: mabw-rte <[email protected]>
  • Loading branch information
8 people authored Sep 27, 2024
1 parent 298b39a commit 426a97a
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 8 deletions.
1 change: 1 addition & 0 deletions antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class TaskType(str, Enum):
SCAN = "SCAN"
UPGRADE_STUDY = "UPGRADE_STUDY"
THERMAL_CLUSTER_SERIES_GENERATION = "THERMAL_CLUSTER_SERIES_GENERATION"
SNAPSHOT_CLEARING = "SNAPSHOT_CLEARING"


class TaskStatus(Enum):
Expand Down
74 changes: 70 additions & 4 deletions antarest/study/storage/variantstudy/variant_study_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import re
import shutil
import typing as t
from datetime import datetime
from datetime import datetime, timedelta
from functools import reduce
from pathlib import Path
from uuid import uuid4
Expand Down Expand Up @@ -45,9 +45,11 @@
from antarest.core.serialization import to_json_string
from antarest.core.tasks.model import CustomTaskEventMessages, TaskDTO, TaskResult, TaskType
from antarest.core.tasks.service import DEFAULT_AWAIT_MAX_TIMEOUT, ITaskService, TaskUpdateNotifier, noop_notifier
from antarest.core.utils.fastapi_sqlalchemy import db
from antarest.core.utils.utils import assert_this, suppress_exception
from antarest.matrixstore.service import MatrixService
from antarest.study.model import RawStudy, Study, StudyAdditionalData, StudyMetadataDTO, StudySimResultDTO
from antarest.study.repository import AccessPermissions, StudyFilter
from antarest.study.storage.abstract_storage_service import AbstractStorageService
from antarest.study.storage.patch_service import PatchService
from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig, FileStudyTreeConfigDTO
Expand Down Expand Up @@ -625,9 +627,9 @@ def callback(notifier: TaskUpdateNotifier) -> TaskResult:
)
return TaskResult(
success=generate_result.success,
message=f"{study_id} generated successfully"
if generate_result.success
else f"{study_id} not generated",
message=(
f"{study_id} generated successfully" if generate_result.success else f"{study_id} not generated"
),
return_value=generate_result.model_dump_json(),
)

Expand Down Expand Up @@ -1053,3 +1055,67 @@ def initialize_additional_data(self, variant_study: VariantStudy) -> bool:
exc_info=e,
)
return False

def clear_all_snapshots(self, retention_hours: timedelta, params: t.Optional[RequestParameters] = None) -> str:
"""
Admin command that clear all variant snapshots older than `retention_hours` (in hours).
Only available for admin users.
Args:
retention_hours: number of retention hours
params: request parameters used to identify the user status
Returns: None
Raises:
UserHasNotPermissionError
"""
if params is None or (params.user and not params.user.is_site_admin() and not params.user.is_admin_token()):
raise UserHasNotPermissionError()

task_name = f"Cleaning all snapshot updated or accessed at least {retention_hours} hours ago."

snapshot_clearing_task_instance = SnapshotCleanerTask(
variant_study_service=self, retention_hours=retention_hours
)

return self.task_service.add_task(
snapshot_clearing_task_instance,
task_name,
task_type=TaskType.SNAPSHOT_CLEARING,
ref_id="SNAPSHOT_CLEANING",
custom_event_messages=None,
request_params=params,
)


class SnapshotCleanerTask:
def __init__(
self,
variant_study_service: VariantStudyService,
retention_hours: timedelta,
) -> None:
self._variant_study_service = variant_study_service
self._retention_hours = retention_hours

def _clear_all_snapshots(self) -> None:
with db():
variant_list = self._variant_study_service.repository.get_all(
study_filter=StudyFilter(
variant=True,
access_permissions=AccessPermissions(is_admin=True),
)
)
for variant in variant_list:
if variant.updated_at and variant.updated_at < datetime.utcnow() - self._retention_hours:
if variant.last_access and variant.last_access < datetime.utcnow() - self._retention_hours:
self._variant_study_service.clear_snapshot(variant)

def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult:
msg = f"Start cleaning all snapshots updated or accessed {self._retention_hours} hours ago."
notifier(msg)
self._clear_all_snapshots()
msg = f"All selected snapshots were successfully cleared."
notifier(msg)
return TaskResult(success=True, message=msg)

__call__ = run_task
31 changes: 30 additions & 1 deletion antarest/study/web/variant_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# SPDX-License-Identifier: MPL-2.0
#
# This file is part of the Antares project.

import datetime
import logging
from typing import List, Optional, Union

Expand Down Expand Up @@ -416,4 +416,33 @@ def create_from_variant(
params = RequestParameters(user=current_user)
raise NotImplementedError()

@bp.put(
"/studies/variants/clear-snapshots",
tags=[APITag.study_variant_management],
summary="Clear variant snapshots",
responses={
200: {
"description": "Delete snapshots older than a specific number of hours. By default, this number is 24."
}
},
)
def clear_variant_snapshots(
hours: int = 24,
current_user: JWTUser = Depends(auth.get_current_user),
) -> str:
"""
Endpoint that clear `limit` hours old and older variant snapshots.
Args: limit (int, optional): Number of hours to clear. Defaults to 24.
Returns: ID of the task running the snapshot clearing.
"""
retention_hours = datetime.timedelta(hours=hours)
logger.info(
f"Delete all variant snapshots older than {retention_hours} hours.",
extra={"user": current_user.id},
)
params = RequestParameters(user=current_user)
return variant_study_service.clear_all_snapshots(retention_hours, params)

return bp
106 changes: 106 additions & 0 deletions tests/integration/variant_blueprint/test_variant_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@
#
# This file is part of the Antares project.

import datetime
import io
import logging
import time
import typing as t
from pathlib import Path

import pytest
from starlette.testclient import TestClient

from antarest.core.tasks.model import TaskDTO, TaskStatus
from tests.integration.assets import ASSETS_DIR
from tests.integration.utils import wait_task_completion


@pytest.fixture(name="base_study_id")
Expand All @@ -45,6 +48,68 @@ def variant_id_fixture(
return t.cast(str, res.json())


@pytest.fixture(name="generate_snapshots")
def generate_snapshot_fixture(
client: TestClient,
admin_access_token: str,
base_study_id: str,
monkeypatch: pytest.MonkeyPatch,
caplog: t.Any,
) -> t.List[str]:
"""Generate some snapshots with different date of update and last access"""

class FakeDatetime:
"""
Class that handle fake timestamp creation/update of variant
"""

fake_time: datetime.datetime

@classmethod
def now(cls) -> datetime.datetime:
"""Method used to get the custom timestamp"""
return cls.fake_time

@classmethod
def utcnow(cls) -> datetime.datetime:
"""Method used while a variant is created"""
return cls.now()

# Initialize variant_ids list
variant_ids = []

admin_headers = {"Authorization": f"Bearer {admin_access_token}"}

with caplog.at_level(level=logging.WARNING):
# Generate three different timestamp
older_time = datetime.datetime.utcnow() - datetime.timedelta(
hours=25
) # older than the default value which is 24
old_time = datetime.datetime.utcnow() - datetime.timedelta(hours=8) # older than 6 hours
recent_time = datetime.datetime.utcnow() - datetime.timedelta(hours=2) # older than 0 hours

with monkeypatch.context() as m:
# Patch the datetime import instance of the variant_study_service package to hack
# the `created_at` and `updated_at` fields
# useful when a variant is created
m.setattr("antarest.study.storage.variantstudy.variant_study_service.datetime", FakeDatetime)
# useful when a study is accessed
m.setattr("antarest.study.service.datetime", FakeDatetime)

for index, different_time in enumerate([older_time, old_time, recent_time]):
FakeDatetime.fake_time = different_time
res = client.post(f"/v1/studies/{base_study_id}/variants?name=variant{index}", headers=admin_headers)
variant_ids.append(res.json())

# Generate snapshot for each variant
task_id = client.put(f"/v1/studies/{variant_ids[index]}/generate", headers=admin_headers)
wait_task_completion(
client, admin_access_token, task_id.json()
) # wait for the filesystem to be updated
client.get(f"v1/studies/{variant_ids[index]}", headers=admin_headers)
return t.cast(t.List[str], variant_ids)


def test_variant_manager(
client: TestClient,
admin_access_token: str,
Expand Down Expand Up @@ -330,3 +395,44 @@ def test_outputs(client: TestClient, admin_access_token: str, variant_id: str, t
assert res.status_code == 200, res.json()
outputs = res.json()
assert len(outputs) == 1


def test_clear_snapshots(
client: TestClient,
admin_access_token: str,
tmp_path: Path,
generate_snapshots: t.List[str],
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""
The `snapshot/` directory must not exist after a call to `clear-snapshot`.
"""
# Set up
admin_headers = {"Authorization": f"Bearer {admin_access_token}"}

older = Path(tmp_path).joinpath(f"internal_workspace", generate_snapshots[0], "snapshot")
old = Path(tmp_path).joinpath(f"internal_workspace", generate_snapshots[1], "snapshot")
recent = Path(tmp_path).joinpath(f"internal_workspace", generate_snapshots[2], "snapshot")

# Test
# Check initial data
assert older.exists() and old.exists() and recent.exists()

# Delete the older snapshot (default retention hours implicitly equals to 24 hours)
# and check if it was successfully deleted
response = client.put(f"v1/studies/variants/clear-snapshots", headers=admin_headers)
task = response.json()
wait_task_completion(client, admin_access_token, task)
assert (not older.exists()) and old.exists() and recent.exists()

# Delete the old snapshot and check if it was successfully deleted
response = client.put(f"v1/studies/variants/clear-snapshots?hours=6", headers=admin_headers)
task = response.json()
wait_task_completion(client, admin_access_token, task)
assert (not older.exists()) and (not old.exists()) and recent.exists()

# Delete the recent snapshot and check if it was successfully deleted
response = client.put(f"v1/studies/variants/clear-snapshots?hours=-1", headers=admin_headers)
task = response.json()
wait_task_completion(client, admin_access_token, task)
assert not (older.exists() and old.exists() and recent.exists())
Loading

0 comments on commit 426a97a

Please sign in to comment.