Skip to content

Commit

Permalink
feat(ts-gen): add timeseries generation for thermal clusters (#2112)
Browse files Browse the repository at this point in the history
Uses antares-timeseries-generation to perform thermal clusters
timeseries generation.

The generation is performed as a background task on file studies,
since it can take several seconds or more.

Note that generated matrices are not "inner matrices" of the
command since they are generated on each command execution.
Generated matrices that will be stored later in the matrix store will
be saved from garbage collection as all other snaphot matrices.
  • Loading branch information
MartinBelthle authored Aug 21, 2024
1 parent 9f739e1 commit bc4f026
Show file tree
Hide file tree
Showing 17 changed files with 35,861 additions and 55 deletions.
2 changes: 1 addition & 1 deletion antarest/core/tasks/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class TaskType(str, Enum):
ARCHIVE = "ARCHIVE"
UNARCHIVE = "UNARCHIVE"
SCAN = "SCAN"
WORKER_TASK = "WORKER_TASK"
UPGRADE_STUDY = "UPGRADE_STUDY"
THERMAL_CLUSTER_SERIES_GENERATION = "THERMAL_CLUSTER_SERIES_GENERATION"


class TaskStatus(Enum):
Expand Down
86 changes: 73 additions & 13 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@
from antarest.study.storage.study_upgrader import StudyUpgrader, check_versions_coherence, find_next_version
from antarest.study.storage.utils import assert_permission, get_start_date, is_managed, remove_from_cache
from antarest.study.storage.variantstudy.business.utils import transform_command_to_dto
from antarest.study.storage.variantstudy.model.command.generate_thermal_cluster_timeseries import (
GenerateThermalClusterTimeSeries,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.replace_matrix import ReplaceMatrix
from antarest.study.storage.variantstudy.model.command.update_comments import UpdateComments
Expand All @@ -127,7 +130,6 @@
from antarest.study.storage.variantstudy.model.model import CommandDTO
from antarest.study.storage.variantstudy.variant_study_service import VariantStudyService
from antarest.worker.archive_worker import ArchiveTaskArgs
from antarest.worker.simulator_worker import GenerateTimeseriesTaskArgs

logger = logging.getLogger(__name__)

Expand All @@ -149,6 +151,51 @@ def get_disk_usage(path: t.Union[str, Path]) -> int:
return total_size


class ThermalClusterTimeSeriesGeneratorTask:
"""
Task to generate thermal clusters time series
"""

def __init__(
self,
_study_id: str,
repository: StudyMetadataRepository,
storage_service: StudyStorageService,
event_bus: IEventBus,
):
self._study_id = _study_id
self.repository = repository
self.storage_service = storage_service
self.event_bus = event_bus

def _generate_timeseries(self) -> None:
"""Run the task (lock the database)."""
command_context = self.storage_service.variant_study_service.command_factory.command_context
command = GenerateThermalClusterTimeSeries(command_context=command_context)
with db():
study = self.repository.one(self._study_id)
file_study = self.storage_service.get_storage(study).get_raw(study)
execute_or_add_commands(study, file_study, [command], self.storage_service)
self.event_bus.push(
Event(
type=EventType.STUDY_EDITED,
payload=study.to_json_summary(),
permissions=PermissionInfo.from_study(study),
)
)

def run_task(self, notifier: TaskUpdateNotifier) -> TaskResult:
msg = f"Generating thermal timeseries for study '{self._study_id}'"
notifier(msg)
self._generate_timeseries()
msg = f"Successfully generated thermal timeseries for study '{self._study_id}'"
notifier(msg)
return TaskResult(success=True, message=msg)

# Make `ThermalClusterTimeSeriesGeneratorTask` object callable
__call__ = run_task


class StudyUpgraderTask:
"""
Task to perform a study upgrade.
Expand Down Expand Up @@ -2385,19 +2432,32 @@ def unarchive_output_task(

return task_id

def generate_timeseries(self, study: Study, params: RequestParameters) -> None:
self._assert_study_unarchived(study)
self.task_service.add_worker_task(
TaskType.WORKER_TASK,
"generate-timeseries",
GenerateTimeseriesTaskArgs(
study_id=study.id,
managed=is_managed(study),
study_path=str(study.path),
study_version=str(study.version),
).dict(),
name=f"Generate timeseries for study {study.id}",
def generate_timeseries(self, study: Study, params: RequestParameters) -> str:
task_name = f"Generating thermal timeseries for study {study.name} ({study.id})"
study_tasks = self.task_service.list_tasks(
TaskListFilter(
ref_id=study.id,
type=[TaskType.THERMAL_CLUSTER_SERIES_GENERATION],
status=[TaskStatus.RUNNING, TaskStatus.PENDING],
),
RequestParameters(user=DEFAULT_ADMIN_USER),
)
if len(study_tasks) > 0:
raise TaskAlreadyRunning()

thermal_cluster_timeseries_generation_task = ThermalClusterTimeSeriesGeneratorTask(
study.id,
repository=self.repository,
storage_service=self.storage_service,
event_bus=self.event_bus,
)

return self.task_service.add_task(
thermal_cluster_timeseries_generation_task,
task_name,
task_type=TaskType.THERMAL_CLUSTER_SERIES_GENERATION,
ref_id=study.id,
custom_event_messages=None,
request_params=params,
)

Expand Down
24 changes: 14 additions & 10 deletions antarest/study/storage/rawstudy/model/filesystem/matrix/matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ class MatrixFrequency(str, Enum):
HOURLY = "hourly"


def dump_dataframe(df: pd.DataFrame, path: Path, float_format: Optional[str] = "%.6f") -> None:
if df.empty:
path.write_bytes(b"")
else:
df.to_csv(
path,
sep="\t",
header=False,
index=False,
float_format=float_format,
)


class MatrixNode(LazyNode[Union[bytes, JSON], Union[bytes, JSON], JSON], ABC):
def __init__(
self,
Expand Down Expand Up @@ -142,13 +155,4 @@ def dump(
self.config.path.write_bytes(data)
else:
df = pd.DataFrame(**data)
if df.empty:
self.config.path.write_bytes(b"")
else:
df.to_csv(
self.config.path,
sep="\t",
header=False,
index=False,
float_format="%.6f",
)
dump_dataframe(df, self.config.path)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from antarest.study.storage.variantstudy.model.command.create_link import CreateLink
from antarest.study.storage.variantstudy.model.command.create_renewables_cluster import CreateRenewablesCluster
from antarest.study.storage.variantstudy.model.command.create_st_storage import CreateSTStorage
from antarest.study.storage.variantstudy.model.command.generate_thermal_cluster_timeseries import (
GenerateThermalClusterTimeSeries,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.remove_area import RemoveArea
from antarest.study.storage.variantstudy.model.command.remove_binding_constraint import RemoveBindingConstraint
Expand Down Expand Up @@ -317,6 +320,12 @@ def _revert_update_district(
extractor = base_command.get_command_extractor()
return [extractor.generate_update_district(base, base_command.id)]

@staticmethod
def _revert_generate_thermal_cluster_timeseries(
base_command: GenerateThermalClusterTimeSeries, history: t.List["ICommand"], base: FileStudy
) -> t.List[ICommand]:
raise NotImplementedError("The revert function for GenerateThermalClusterTimeSeries is not available")

def revert(
self,
base_command: ICommand,
Expand Down
4 changes: 4 additions & 0 deletions antarest/study/storage/variantstudy/command_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from antarest.study.storage.variantstudy.model.command.create_link import CreateLink
from antarest.study.storage.variantstudy.model.command.create_renewables_cluster import CreateRenewablesCluster
from antarest.study.storage.variantstudy.model.command.create_st_storage import CreateSTStorage
from antarest.study.storage.variantstudy.model.command.generate_thermal_cluster_timeseries import (
GenerateThermalClusterTimeSeries,
)
from antarest.study.storage.variantstudy.model.command.icommand import ICommand
from antarest.study.storage.variantstudy.model.command.remove_area import RemoveArea
from antarest.study.storage.variantstudy.model.command.remove_binding_constraint import RemoveBindingConstraint
Expand Down Expand Up @@ -54,6 +57,7 @@
CommandName.UPDATE_DISTRICT.value: UpdateDistrict,
CommandName.UPDATE_PLAYLIST.value: UpdatePlaylist,
CommandName.UPDATE_SCENARIO_BUILDER.value: UpdateScenarioBuilder,
CommandName.GENERATE_THERMAL_CLUSTER_TIMESERIES.value: GenerateThermalClusterTimeSeries,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ class CommandName(Enum):
UPDATE_DISTRICT = "update_district"
UPDATE_PLAYLIST = "update_playlist"
UPDATE_SCENARIO_BUILDER = "update_scenario_builder"
GENERATE_THERMAL_CLUSTER_TIMESERIES = "generate_thermal_cluster_timeseries"
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import logging
import shutil
import tempfile
import typing as t
from pathlib import Path

import numpy as np
import pandas as pd
from antares.tsgen.duration_generator import ProbabilityLaw
from antares.tsgen.random_generator import MersenneTwisterRNG
from antares.tsgen.ts_generator import ThermalCluster, ThermalDataGenerator

from antarest.study.storage.rawstudy.model.filesystem.config.model import Area, FileStudyTreeConfig
from antarest.study.storage.rawstudy.model.filesystem.config.thermal import LocalTSGenerationBehavior
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.rawstudy.model.filesystem.matrix.matrix import dump_dataframe
from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput
from antarest.study.storage.variantstudy.model.command.icommand import ICommand, OutputTuple
from antarest.study.storage.variantstudy.model.model import CommandDTO

logger = logging.getLogger(__name__)


MODULATION_CAPACITY_COLUMN = 2
FO_RATE_COLUMN = 2
PO_RATE_COLUMN = 3


class GenerateThermalClusterTimeSeries(ICommand):
"""
Command used to generate thermal cluster timeseries for an entire study
"""

command_name = CommandName.GENERATE_THERMAL_CLUSTER_TIMESERIES
version = 1

def _apply_config(self, study_data: FileStudyTreeConfig) -> OutputTuple:
return CommandOutput(status=True, message="Nothing to do"), {}

def _apply(self, study_data: FileStudy) -> CommandOutput:
study_path = study_data.config.study_path
with tempfile.TemporaryDirectory(
suffix=".thermal_timeseries_gen.tmp", prefix="~", dir=study_path.parent
) as path:
tmp_dir = Path(path)
try:
shutil.copytree(study_path / "input" / "thermal" / "series", tmp_dir, dirs_exist_ok=True)
self._build_timeseries(study_data, tmp_dir)
except Exception as e:
logger.error(f"Unhandled exception when trying to generate thermal timeseries: {e}", exc_info=True)
raise
else:
self._replace_safely_original_files(study_path, tmp_dir)
return CommandOutput(status=True, message="All time series were generated successfully")

def _build_timeseries(self, study_data: FileStudy, tmp_path: Path) -> None:
# 1- Get the seed and nb_years to generate
# NB: Default seed in IHM Legacy: 5489, default seed in web: 3005489.
general_data = study_data.tree.get(["settings", "generaldata"], depth=3)
thermal_seed = general_data["seeds - Mersenne Twister"]["seed-tsgen-thermal"]
nb_years = general_data["general"]["nbtimeseriesthermal"]
# 2 - Build the generator
rng = MersenneTwisterRNG(seed=thermal_seed)
generator = ThermalDataGenerator(rng=rng, days=365)
# 3- Loop through areas in alphabetical order
areas: t.Dict[str, Area] = study_data.config.areas
sorted_areas = {k: areas[k] for k in sorted(areas)}
for area_id, area in sorted_areas.items():
# 4- Loop through thermal clusters in alphabetical order
sorted_thermals = sorted(area.thermals, key=lambda x: x.id)
for thermal in sorted_thermals:
# 5 - Filters out clusters with no generation
if thermal.gen_ts == LocalTSGenerationBehavior.FORCE_NO_GENERATION:
continue
# 6- Build the cluster
url = ["input", "thermal", "prepro", area_id, thermal.id.lower(), "modulation"]
matrix = study_data.tree.get_node(url)
matrix_df = matrix.parse(return_dataframe=True) # type: ignore
modulation_capacity = matrix_df[MODULATION_CAPACITY_COLUMN].to_numpy()
url = ["input", "thermal", "prepro", area_id, thermal.id.lower(), "data"]
matrix = study_data.tree.get_node(url)
matrix_df = matrix.parse(return_dataframe=True) # type: ignore
fo_duration, po_duration, fo_rate, po_rate, npo_min, npo_max = [
np.array(matrix_df[i], dtype=float if i in [FO_RATE_COLUMN, PO_RATE_COLUMN] else int)
for i in matrix_df.columns
]
cluster = ThermalCluster(
unit_count=thermal.unit_count,
nominal_power=thermal.nominal_capacity,
modulation=modulation_capacity,
fo_law=ProbabilityLaw(thermal.law_forced.value.upper()),
fo_volatility=thermal.volatility_forced,
po_law=ProbabilityLaw(thermal.law_planned.value.upper()),
po_volatility=thermal.volatility_planned,
fo_duration=fo_duration,
fo_rate=fo_rate,
po_duration=po_duration,
po_rate=po_rate,
npo_min=npo_min,
npo_max=npo_max,
)
# 7- Generate the time-series
results = generator.generate_time_series(cluster, nb_years)
generated_matrix = results.available_power
# 8- Write the matrix inside the input folder.
df = pd.DataFrame(data=generated_matrix, dtype=int)
target_path = self._build_matrix_path(tmp_path / area_id / thermal.id.lower())
dump_dataframe(df, target_path, None)

def to_dto(self) -> CommandDTO:
return CommandDTO(action=self.command_name.value, args={})

def match_signature(self) -> str:
return str(self.command_name.value)

def match(self, other: "ICommand", equal: bool = False) -> bool:
# Only used inside the cli app that no one uses I believe.
if not isinstance(other, GenerateThermalClusterTimeSeries):
return False
return True

def _create_diff(self, other: "ICommand") -> t.List["ICommand"]:
# Only used inside the cli app that no one uses I believe.
raise NotImplementedError()

def get_inner_matrices(self) -> t.List[str]:
# This is used to get used matrices and not remove them inside the garbage collector loop.
return []

@staticmethod
def _replace_safely_original_files(study_path: Path, tmp_path: Path) -> None:
original_path = study_path / "input" / "thermal" / "series"
shutil.rmtree(original_path)
tmp_path.rename(original_path)

@staticmethod
def _build_matrix_path(matrix_path: Path) -> Path:
real_path = matrix_path / "series.txt"
if not real_path.exists():
(matrix_path / "series.txt.link").rename(real_path)
return real_path
8 changes: 7 additions & 1 deletion antarest/study/web/study_data_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,13 @@ def set_advanced_parameters(
def generate_timeseries(
uuid: str,
current_user: JWTUser = Depends(auth.get_current_user),
) -> t.Any:
) -> str:
"""
Generates time-series for thermal clusters and put them inside input data.
Args:
- `uuid`: The UUID of the study.
"""
logger.info(
f"Generating timeseries for study {uuid}",
extra={"user": current_user.id},
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
Antares-Launcher~=1.3.2
antares-study-version~=1.0.3
antares-timeseries-generation~=0.1.5

alembic~=1.7.5
asgi-ratelimit[redis]==0.7.0
Expand Down
Loading

0 comments on commit bc4f026

Please sign in to comment.