Skip to content

Commit

Permalink
Merge branch 'dev' into add-arrow-support-in-backend
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed Aug 21, 2024
2 parents 123419e + bc4f026 commit 59f8beb
Show file tree
Hide file tree
Showing 61 changed files with 36,180 additions and 1,001 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ jobs:
- name: Test with pytest
run: |
pytest --cov antarest --cov-report xml
- name: Fix code coverage paths
if: matrix.os == 'ubuntu-20.04'
run: |
sed -i 's/\/home\/runner\/work\/AntaREST\/AntaREST/\/github\/workspace/g' coverage.xml
- name: Archive code coverage results
if: matrix.os == 'ubuntu-20.04'
uses: actions/upload-artifact@v4
Expand Down
7 changes: 2 additions & 5 deletions antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,8 @@ def __str__(self) -> str:


class UnsupportedStudyVersion(HTTPException):
def __init__(self, version: str) -> None:
super().__init__(
HTTPStatus.BAD_REQUEST,
f"Study version {version} is not supported",
)
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.BAD_REQUEST, message)


class UnsupportedOperationOnArchivedStudy(HTTPException):
Expand Down
2 changes: 1 addition & 1 deletion antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class TaskType(str, Enum):
ARCHIVE = "ARCHIVE"
UNARCHIVE = "UNARCHIVE"
SCAN = "SCAN"
WORKER_TASK = "WORKER_TASK"
UPGRADE_STUDY = "UPGRADE_STUDY"
THERMAL_CLUSTER_SERIES_GENERATION = "THERMAL_CLUSTER_SERIES_GENERATION"


class TaskStatus(Enum):
Expand Down
4 changes: 2 additions & 2 deletions antarest/study/business/adequacy_patch_management.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Dict, List, Optional

from pydantic.types import StrictBool, confloat
from pydantic.types import StrictBool, confloat, conint

from antarest.study.business.enum_ignore_case import EnumIgnoreCase
from antarest.study.business.utils import GENERAL_DATA_PATH, FieldInfo, FormFieldsBaseModel, execute_or_add_commands
Expand Down Expand Up @@ -28,7 +28,7 @@ class AdequacyPatchFormFields(FormFieldsBaseModel):
check_csr_cost_function: Optional[StrictBool]
threshold_initiate_curtailment_sharing_rule: Optional[ThresholdType] # type: ignore
threshold_display_local_matching_rule_violations: Optional[ThresholdType] # type: ignore
threshold_csr_variable_bounds_relaxation: Optional[ThresholdType] # type: ignore
threshold_csr_variable_bounds_relaxation: Optional[conint(ge=0, strict=True)] # type: ignore


ADEQUACY_PATCH_PATH = f"{GENERAL_DATA_PATH}/adequacy patch"
Expand Down
108 changes: 82 additions & 26 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
StudyVariantUpgradeError,
TaskAlreadyRunning,
UnsupportedOperationOnArchivedStudy,
UnsupportedStudyVersion,
)
from antarest.core.filetransfer.model import FileDownloadTaskDTO
from antarest.core.filetransfer.service import FileTransferManager
Expand Down Expand Up @@ -118,14 +117,12 @@
from antarest.study.storage.rawstudy.raw_study_service import RawStudyService
from antarest.study.storage.storage_service import StudyStorageService
from antarest.study.storage.study_download_utils import StudyDownloader, get_output_variables_information
from antarest.study.storage.study_upgrader import (
find_next_version,
get_current_version,
should_study_be_denormalized,
upgrade_study,
)
from antarest.study.storage.study_upgrader import StudyUpgrader, check_versions_coherence, find_next_version
from antarest.study.storage.utils import assert_permission, get_start_date, is_managed, remove_from_cache
from antarest.study.storage.variantstudy.business.utils import transform_command_to_dto
from antarest.study.storage.variantstudy.model.command.generate_thermal_cluster_timeseries import (
GenerateThermalClusterTimeSeries,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.replace_matrix import ReplaceMatrix
from antarest.study.storage.variantstudy.model.command.update_comments import UpdateComments
Expand All @@ -135,7 +132,6 @@
from antarest.study.storage.variantstudy.model.model import CommandDTO
from antarest.study.storage.variantstudy.variant_study_service import VariantStudyService
from antarest.worker.archive_worker import ArchiveTaskArgs
from antarest.worker.simulator_worker import GenerateTimeseriesTaskArgs

logger = logging.getLogger(__name__)

Expand All @@ -157,6 +153,51 @@ def get_disk_usage(path: t.Union[str, Path]) -> int:
return total_size


class ThermalClusterTimeSeriesGeneratorTask:
"""
Task to generate thermal clusters time series
"""

def __init__(
self,
_study_id: str,
repository: StudyMetadataRepository,
storage_service: StudyStorageService,
event_bus: IEventBus,
):
self._study_id = _study_id
self.repository = repository
self.storage_service = storage_service
self.event_bus = event_bus

def _generate_timeseries(self) -> None:
"""Run the task (lock the database)."""
command_context = self.storage_service.variant_study_service.command_factory.command_context
command = GenerateThermalClusterTimeSeries(command_context=command_context)
with db():
study = self.repository.one(self._study_id)
file_study = self.storage_service.get_storage(study).get_raw(study)
execute_or_add_commands(study, file_study, [command], self.storage_service)
self.event_bus.push(
Event(
type=EventType.STUDY_EDITED,
payload=study.to_json_summary(),
permissions=PermissionInfo.from_study(study),
)
)

def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult:
msg = f"Generating thermal timeseries for study '{self._study_id}'"
notifier(msg)
self._generate_timeseries()
msg = f"Successfully generated thermal timeseries for study '{self._study_id}'"
notifier(msg)
return TaskResult(success=True, message=msg)

# Make `ThermalClusterTimeSeriesGeneratorTask` object callable
__call__ = run_task


class StudyUpgraderTask:
"""
Task to perform a study upgrade.
Expand Down Expand Up @@ -194,13 +235,13 @@ def _upgrade_study(self) -> None:
self.storage_service.variant_study_service.clear_snapshot(study_to_upgrade)
else:
study_path = Path(study_to_upgrade.path)
current_version = get_current_version(study_path)
if is_managed(study_to_upgrade) and should_study_be_denormalized(current_version, target_version):
study_upgrader = StudyUpgrader(study_path, target_version)
if is_managed(study_to_upgrade) and study_upgrader.should_denormalize_study():
# We have to denormalize the study because the upgrade impacts study matrices
file_study = self.storage_service.get_storage(study_to_upgrade).get_raw(study_to_upgrade)
file_study.tree.denormalize()
is_study_denormalized = True
upgrade_study(study_path, target_version)
study_upgrader.upgrade()
remove_from_cache(self.cache_service, study_to_upgrade.id)
study_to_upgrade.version = target_version
self.repository.save(study_to_upgrade)
Expand Down Expand Up @@ -2392,19 +2433,32 @@ def unarchive_output_task(

return task_id

def generate_timeseries(self, study: Study, params: RequestParameters) -> None:
self._assert_study_unarchived(study)
self.task_service.add_worker_task(
TaskType.WORKER_TASK,
"generate-timeseries",
GenerateTimeseriesTaskArgs(
study_id=study.id,
managed=is_managed(study),
study_path=str(study.path),
study_version=str(study.version),
).dict(),
name=f"Generate timeseries for study {study.id}",
def generate_timeseries(self, study: Study, params: RequestParameters) -> str:
task_name = f"Generating thermal timeseries for study {study.name} ({study.id})"
study_tasks = self.task_service.list_tasks(
TaskListFilter(
ref_id=study.id,
type=[TaskType.THERMAL_CLUSTER_SERIES_GENERATION],
status=[TaskStatus.RUNNING, TaskStatus.PENDING],
),
RequestParameters(user=DEFAULT_ADMIN_USER),
)
if len(study_tasks) > 0:
raise TaskAlreadyRunning()

thermal_cluster_timeseries_generation_task = ThermalClusterTimeSeriesGeneratorTask(
study.id,
repository=self.repository,
storage_service=self.storage_service,
event_bus=self.event_bus,
)

return self.task_service.add_task(
thermal_cluster_timeseries_generation_task,
task_name,
task_type=TaskType.THERMAL_CLUSTER_SERIES_GENERATION,
ref_id=study.id,
custom_event_messages=None,
request_params=params,
)

Expand All @@ -2427,13 +2481,15 @@ def upgrade_study(
# First check if the study is a variant study, if so throw an error
if isinstance(study, VariantStudy):
raise StudyVariantUpgradeError(True)
# If the study is a parent raw study, throw an error
# If the study is a parent raw study and has variants, throw an error
elif self.repository.has_children(study_id):
raise StudyVariantUpgradeError(False)

target_version = target_version or find_next_version(study.version)
# Checks versions coherence before launching the task
if not target_version:
raise UnsupportedStudyVersion(study.version)
target_version = find_next_version(study.version)
else:
check_versions_coherence(study.version, target_version)

task_name = f"Upgrade study {study.name} ({study.id}) to version {target_version}"
study_tasks = self.task_service.list_tasks(
Expand Down
24 changes: 14 additions & 10 deletions antarest/study/storage/rawstudy/model/filesystem/matrix/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ class MatrixFrequency(str, Enum):
HOURLY = "hourly"


def dump_dataframe(df: pd.DataFrame, path: Path, float_format: Optional[str] = "%.6f") -> None:
if df.empty:
path.write_bytes(b"")
else:
df.to_csv(
path,
sep="\t",
header=False,
index=False,
float_format=float_format,
)


class MatrixNode(LazyNode[Union[bytes, JSON], Union[bytes, JSON], JSON], ABC):
def __init__(
self,
Expand Down Expand Up @@ -143,13 +156,4 @@ def dump(
self.config.path.write_bytes(data)
else:
df = pd.DataFrame(**data)
if df.empty:
self.config.path.write_bytes(b"")
else:
df.to_csv(
self.config.path,
sep="\t",
header=False,
index=False,
float_format="%.6f",
)
dump_dataframe(df, self.config.path)
Loading

0 comments on commit 59f8beb

Please sign in to comment.