From 1f58a9f32667d01a3be80da2109cd56093c1c0fb Mon Sep 17 00:00:00 2001 From: Paul Bui-Quang Date: Thu, 9 Dec 2021 10:56:19 +0100 Subject: [PATCH 1/3] Update version Signed-off-by: Paul Bui-Quang --- antarest/__init__.py | 2 +- setup.py | 2 +- sonar-project.properties | 2 +- webapp/package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/antarest/__init__.py b/antarest/__init__.py index 514bcf172c..19d92fe27e 100644 --- a/antarest/__init__.py +++ b/antarest/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.1.4" +__version__ = "2.1.5" from pathlib import Path diff --git a/setup.py b/setup.py index 229655022b..d3a4ecf83d 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="AntaREST", - version="2.1.4", + version="2.1.5", description="Antares Server", long_description=long_description, long_description_content_type="text/markdown", diff --git a/sonar-project.properties b/sonar-project.properties index b5e2ed34e0..cb15f8f5f3 100644 --- a/sonar-project.properties +++ b/sonar-project.properties @@ -4,4 +4,4 @@ sonar.sources=antarest sonar.language=python sonar.exclusions=antarest/gui.py,antarest/main.py sonar.python.coverage.reportPaths=coverage.xml -sonar.projectVersion=2.1.4 \ No newline at end of file +sonar.projectVersion=2.1.5 \ No newline at end of file diff --git a/webapp/package.json b/webapp/package.json index 6059b4fb7d..f2a75be817 100644 --- a/webapp/package.json +++ b/webapp/package.json @@ -1,6 +1,6 @@ { "name": "antares-web", - "version": "2.1.4", + "version": "2.1.5", "private": true, "dependencies": { "@fortawesome/fontawesome-svg-core": "^1.2.36", From 93b8aa783405d20f6030d7bc9ebe167054c0fccd Mon Sep 17 00:00:00 2001 From: Wintxer <47366828+Wintxer@users.noreply.github.com> Date: Thu, 9 Dec 2021 16:16:22 +0100 Subject: [PATCH 2/3] Added empty placeholder in the data modal matrix view (#661) --- webapp/public/locales/en/data.json | 2 +- webapp/public/locales/fr/data.json | 2 +- webapp/src/components/Data/MatrixModal.tsx | 21 +++++++++++++++++---- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/webapp/public/locales/en/data.json b/webapp/public/locales/en/data.json index 2529443ece..d4a482a3ce 100644 --- a/webapp/public/locales/en/data.json +++ b/webapp/public/locales/en/data.json @@ -33,5 +33,5 @@ "jsonFormat": "JSON Format", "graphSelector": "Columns", "monotonicView": "Monotonic", - "matrixEmpty": "No data" + "matrixEmpty": "Empty matrix" } \ No newline at end of file diff --git a/webapp/public/locales/fr/data.json b/webapp/public/locales/fr/data.json index bf6170116e..ab112175a7 100644 --- a/webapp/public/locales/fr/data.json +++ b/webapp/public/locales/fr/data.json @@ -33,5 +33,5 @@ "jsonFormat": "Format JSON", "graphSelector": "Colonnes", "monotonicView": "Monotone", - "matrixEmpty": "Pas de données" + "matrixEmpty": "Matrice vide" } \ No newline at end of file diff --git a/webapp/src/components/Data/MatrixModal.tsx b/webapp/src/components/Data/MatrixModal.tsx index 4179015a65..025e78763e 100644 --- a/webapp/src/components/Data/MatrixModal.tsx +++ b/webapp/src/components/Data/MatrixModal.tsx @@ -9,6 +9,7 @@ import MatrixView from '../ui/MatrixView/index'; import { getMatrix } from '../../services/api/matrix'; import { CopyIcon } from './utils'; import enqueueErrorSnackbar from '../ui/ErrorSnackBar'; +import NoContent from '../ui/NoContent'; import SimpleLoader from '../ui/loaders/SimpleLoader'; const useStyles = makeStyles((theme: Theme) => @@ -19,6 +20,11 @@ const useStyles = makeStyles((theme: Theme) => padding: theme.spacing(2), position: 'relative', }, + matrixEmpty: { + width: '100%', + height: '100%', + paddingTop: '3%', + }, })); interface PropTypes { @@ -91,10 +97,17 @@ const MatrixModal = (props: PropTypes) => { { loading && } - + {matrix.columns.length > 0 ? ( + + ) : !loading && ( +
+ +
+ )} + ); From 06eea06415bd5bd69aeb8eca35f18e8e02e670c7 Mon Sep 17 00:00:00 2001 From: Paul Bui-Quang Date: Thu, 9 Dec 2021 16:25:26 +0100 Subject: [PATCH 3/3] Fix hangging tasks (#660) --- antarest/core/cache/business/redis_cache.py | 1 + antarest/core/core_blueprint.py | 16 ++++++- antarest/core/interfaces/eventbus.py | 1 + antarest/core/tasks/service.py | 42 ++++++++++++++++++- antarest/core/tasks/web.py | 8 ++++ antarest/main.py | 2 - antarest/study/service.py | 5 ++- .../study/storage/abstract_storage_service.py | 6 ++- .../storage/rawstudy/raw_study_service.py | 6 ++- .../variantstudy/variant_study_service.py | 40 +++++++----------- tests/core/test_tasks.py | 41 +++++++++++++++++- 11 files changed, 135 insertions(+), 33 deletions(-) diff --git a/antarest/core/cache/business/redis_cache.py b/antarest/core/cache/business/redis_cache.py index c8f187590b..3faddd4d3f 100644 --- a/antarest/core/cache/business/redis_cache.py +++ b/antarest/core/cache/business/redis_cache.py @@ -49,6 +49,7 @@ def get( else refresh_timeout, ) return redis_element.data + logger.info(f"Cache key {id} not found") return None def invalidate(self, id: str) -> None: diff --git a/antarest/core/core_blueprint.py b/antarest/core/core_blueprint.py index 99f6d6b3b5..a435bb59a3 100644 --- a/antarest/core/core_blueprint.py +++ b/antarest/core/core_blueprint.py @@ -1,13 +1,17 @@ +import logging import subprocess from pathlib import Path from typing import Any, Optional -from fastapi import APIRouter +from fastapi import APIRouter, Depends from pydantic import BaseModel from antarest import __version__ from antarest.core.config import Config +from antarest.core.jwt import JWTUser +from antarest.core.requests import UserHasNotPermissionError from antarest.core.utils.web import APITag +from antarest.login.auth import Auth def get_commit_id(path_resources: Path) -> Optional[str]: @@ -54,6 +58,7 @@ def create_utils_routes(config: Config) -> APIRouter: """ bp = APIRouter() + auth = Auth(config) @bp.get("/health", tags=[APITag.misc], response_model=StatusDTO) def health() -> Any: @@ -70,4 +75,13 @@ def version() -> Any: version=__version__, gitcommit=get_commit_id(config.resources_path) ) + @bp.get("/kill", include_in_schema=False) + def kill_worker( + current_user: JWTUser = Depends(auth.get_current_user), + ) -> Any: + if not current_user.is_site_admin(): + raise UserHasNotPermissionError() + logging.getLogger(__name__).warning("Killing the worker") + exit(1) + return bp diff --git a/antarest/core/interfaces/eventbus.py b/antarest/core/interfaces/eventbus.py index 6cc5b5efa8..dfa204e0bc 100644 --- a/antarest/core/interfaces/eventbus.py +++ b/antarest/core/interfaces/eventbus.py @@ -22,6 +22,7 @@ class EventType: TASK_RUNNING = "TASK_RUNNING" TASK_COMPLETED = "TASK_COMPLETED" TASK_FAILED = "TASK_FAILED" + TASK_CANCEL_REQUEST = "TASK_CANCEL_REQUEST" DOWNLOAD_CREATED = "DOWNLOAD_CREATED" DOWNLOAD_READY = "DOWNLOAD_READY" DOWNLOAD_EXPIRED = "DOWNLOAD_EXPIRED" diff --git a/antarest/core/tasks/service.py b/antarest/core/tasks/service.py index 17de69e93f..b38cf3d396 100644 --- a/antarest/core/tasks/service.py +++ b/antarest/core/tasks/service.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor, Future from http import HTTPStatus -from typing import Callable, Optional, List, Dict +from typing import Callable, Optional, List, Dict, Awaitable from fastapi import HTTPException @@ -21,6 +21,7 @@ from antarest.core.requests import ( RequestParameters, MustBeAuthenticatedError, + UserHasNotPermissionError, ) from antarest.core.tasks.model import ( TaskDTO, @@ -96,6 +97,7 @@ def __init__( self.threadpool = ThreadPoolExecutor( max_workers=config.tasks.max_workers, thread_name_prefix="taskjob_" ) + self.event_bus.add_listener(self.create_task_event_callback()) # set the status of previously running job to FAILED due to server restart self._fix_running_status() @@ -135,6 +137,36 @@ def add_task( self.tasks[task.id] = future return str(task.id) + def create_task_event_callback(self) -> Callable[[Event], Awaitable[None]]: + async def task_event_callback(event: Event) -> None: + if event.type == EventType.TASK_CANCEL_REQUEST: + self._cancel_task(str(event.payload), dispatch=False) + + return task_event_callback + + def cancel_task( + self, task_id: str, params: RequestParameters, dispatch: bool = False + ) -> None: + task = self.repo.get_or_raise(task_id) + if params.user and ( + params.user.is_site_admin() + or task.owner_id == params.user.impersonator + ): + self._cancel_task(task_id, dispatch) + else: + raise UserHasNotPermissionError() + + def _cancel_task(self, task_id: str, dispatch: bool = False) -> None: + task = self.repo.get_or_raise(task_id) + if task_id in self.tasks: + self.tasks[task_id].cancel() + task.status = TaskStatus.CANCELLED + self.repo.save(task) + elif dispatch: + self.event_bus.push( + Event(type=EventType.TASK_CANCEL_REQUEST, payload=task_id) + ) + def status_task( self, task_id: str, @@ -175,8 +207,11 @@ def list_db_tasks( def await_task( self, task_id: str, timeout_sec: Optional[int] = None ) -> None: + logger.info(f"Awaiting task {task_id}") if task_id in self.tasks: - self.tasks[task_id].result(timeout_sec) + self.tasks[task_id].result( + timeout_sec or DEFAULT_AWAIT_MAX_TIMEOUT + ) else: logger.warning( f"Task {task_id} not handled by this worker, will poll for task completion from db" @@ -211,11 +246,14 @@ def _run_task( ) with db(): + logger.info(f"Starting task {task_id}") task = retry(lambda: self.repo.get_or_raise(task_id)) task.status = TaskStatus.RUNNING.value self.repo.save(task) + logger.info(f"Task {task_id} set to RUNNING") try: result = callback(self._task_logger(task_id)) + logger.info(f"Task {task_id} ended") self._update_task_status( task_id, TaskStatus.COMPLETED diff --git a/antarest/core/tasks/web.py b/antarest/core/tasks/web.py index 70f5a97a1f..607bded735 100644 --- a/antarest/core/tasks/web.py +++ b/antarest/core/tasks/web.py @@ -48,4 +48,12 @@ def get_task( service.await_task(task_id) return service.status_task(task_id, request_params, with_logs) + @bp.put("/tasks/{task_id}/cancel") + def cancel_task( + task_id: str, + current_user: JWTUser = Depends(auth.get_current_user), + ) -> Any: + request_params = RequestParameters(user=current_user) + return service.cancel_task(task_id, request_params, dispatch=True) + return bp diff --git a/antarest/main.py b/antarest/main.py index e35fa3fd0e..1dbff6abb9 100644 --- a/antarest/main.py +++ b/antarest/main.py @@ -8,7 +8,6 @@ import sqlalchemy.ext.baked # type: ignore import uvicorn # type: ignore -from dateutil import tz from fastapi import FastAPI, HTTPException from fastapi_jwt_auth import AuthJWT # type: ignore from sqlalchemy import create_engine, text @@ -23,7 +22,6 @@ from antarest.core.config import Config from antarest.core.core_blueprint import create_utils_routes from antarest.core.filetransfer.main import build_filetransfer_service -from antarest.core.filetransfer.web import create_file_transfer_api from antarest.core.logging.utils import configure_logger, LoggingMiddleware from antarest.core.persistence import upgrade_db from antarest.core.swagger import customize_openapi diff --git a/antarest/study/service.py b/antarest/study/service.py index 786d4cae23..7920e553e4 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -283,7 +283,10 @@ def get_studies_information( for k in cached_studies: studies[k] = StudyMetadataDTO.parse_obj(cached_studies[k]) else: - for study in self.repository.get_all(): + logger.info("Retrieving all studies") + all_studies = self.repository.get_all() + logger.info("Studies retrieved") + for study in all_studies: if not managed or is_managed(study): study_metadata = self._try_get_studies_information( study, summary diff --git a/antarest/study/storage/abstract_storage_service.py b/antarest/study/storage/abstract_storage_service.py index 6f81d55b40..c53391fab1 100644 --- a/antarest/study/storage/abstract_storage_service.py +++ b/antarest/study/storage/abstract_storage_service.py @@ -351,7 +351,11 @@ def export_output(self, metadata: T, output_id: str, target: Path) -> Path: @abstractmethod def export_study_flat( - self, metadata: T, dest: Path, outputs: bool = True + self, + metadata: T, + dest: Path, + outputs: bool = True, + denormalize: bool = True, ) -> None: raise NotImplementedError() diff --git a/antarest/study/storage/rawstudy/raw_study_service.py b/antarest/study/storage/rawstudy/raw_study_service.py index fc80b8e7fd..716e8e6a12 100644 --- a/antarest/study/storage/rawstudy/raw_study_service.py +++ b/antarest/study/storage/rawstudy/raw_study_service.py @@ -250,7 +250,11 @@ def import_study(self, metadata: RawStudy, stream: IO[bytes]) -> Study: return metadata def export_study_flat( - self, metadata: RawStudy, dest: Path, outputs: bool = True + self, + metadata: RawStudy, + dest: Path, + outputs: bool = True, + denormalize: bool = True, ) -> None: path_study = Path(metadata.path) start_time = time.time() diff --git a/antarest/study/storage/variantstudy/variant_study_service.py b/antarest/study/storage/variantstudy/variant_study_service.py index aeae1e8442..a771d42cf4 100644 --- a/antarest/study/storage/variantstudy/variant_study_service.py +++ b/antarest/study/storage/variantstudy/variant_study_service.py @@ -456,23 +456,6 @@ def _get_variants_parents( return output_list - def get_study_information( - self, study: VariantStudy, summary: bool = False - ) -> StudyMetadataDTO: - """ - Get information present in study.antares file - Args: - study: study - summary: if true, only retrieve basic info from database - - Returns: study metadata - - """ - return super().get_study_information( - study, - summary, - ) - def get( self, metadata: VariantStudy, @@ -493,7 +476,7 @@ def get( Returns: study data formatted in json """ - self._safe_generation(metadata, timeout=30) + self._safe_generation(metadata, timeout=60) self.repository.refresh(metadata) return super().get( metadata=metadata, @@ -567,6 +550,7 @@ def generate_task( ) ): logger.info(f"Starting variant study {metadata.id} generation") + self.repository.refresh(metadata) if metadata.generation_task: try: previous_task = self.task_service.status_task( @@ -670,7 +654,10 @@ def _generate( ) else: self.raw_study_service.export_study_flat( - metadata=parent_study, dest=dest_path, outputs=False + metadata=parent_study, + dest=dest_path, + outputs=False, + denormalize=False, ) results = self._generate_snapshot(variant_study, notifier) @@ -863,7 +850,7 @@ def get_study_sim_result( study: study Returns: study output data """ - self._safe_generation(study, timeout=30) + self._safe_generation(study, timeout=60) return super().get_study_sim_result(study=study) def set_reference_output( @@ -917,7 +904,11 @@ def get_study_path(self, metadata: Study) -> Path: return Path(metadata.path) / SNAPSHOT_RELATIVE_PATH def export_study_flat( - self, metadata: VariantStudy, dest: Path, outputs: bool = True + self, + metadata: VariantStudy, + dest: Path, + outputs: bool = True, + denormalize: bool = True, ) -> None: self._safe_generation(metadata) @@ -938,6 +929,7 @@ def export_study_flat( duration = "{:.3f}".format(stop_time - start_time) logger.info(f"Study {path_study} exported (flat mode) in {duration}s") _, study = self.study_factory.create_from_fs(dest, "", use_cache=False) - study.denormalize() - duration = "{:.3f}".format(time.time() - stop_time) - logger.info(f"Study {path_study} denormalized in {duration}s") + if denormalize: + study.denormalize() + duration = "{:.3f}".format(time.time() - stop_time) + logger.info(f"Study {path_study} denormalized in {duration}s") diff --git a/tests/core/test_tasks.py b/tests/core/test_tasks.py index da8cccb049..d808c88ea6 100644 --- a/tests/core/test_tasks.py +++ b/tests/core/test_tasks.py @@ -2,12 +2,14 @@ from typing import Callable from unittest.mock import Mock, ANY, call +import pytest from sqlalchemy import create_engine from antarest.core.config import Config +from antarest.core.interfaces.eventbus import EventType, Event from antarest.core.jwt import DEFAULT_ADMIN_USER from antarest.core.persistence import Base -from antarest.core.requests import RequestParameters +from antarest.core.requests import RequestParameters, UserHasNotPermissionError from antarest.core.tasks.model import ( TaskJob, TaskStatus, @@ -344,3 +346,40 @@ def test_repository(): == 0 ) assert task_repository.get(new_task.id) is None + + +def test_cancel(): + engine = create_engine("sqlite:///:memory:", echo=True) + Base.metadata.create_all(engine) + DBSessionMiddleware( + Mock(), + custom_engine=engine, + session_args={"autocommit": False, "autoflush": False}, + ) + + repo_mock = Mock(spec=TaskJobRepository) + repo_mock.list.return_value = [] + service = TaskJobService( + config=Config(), repository=repo_mock, event_bus=Mock() + ) + + with pytest.raises(UserHasNotPermissionError): + service.cancel_task("a", RequestParameters()) + + service.cancel_task( + "b", RequestParameters(user=DEFAULT_ADMIN_USER), dispatch=True + ) + service.event_bus.push.assert_called_with( + Event(type=EventType.TASK_CANCEL_REQUEST, payload="b") + ) + + creation_date = datetime.datetime.utcnow() + task = TaskJob(id="a", name="b", status=2, creation_date=creation_date) + repo_mock.list.return_value = [task] + repo_mock.get_or_raise.return_value = task + service.tasks["a"] = Mock() + service.cancel_task( + "a", RequestParameters(user=DEFAULT_ADMIN_USER), dispatch=True + ) + task.status = TaskStatus.CANCELLED.value + repo_mock.save.assert_called_with(task)