Skip to content

Commit

Permalink
Merge branch 'dev' into feat/use-lower-case-for-groups-and-names
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle authored Dec 3, 2024
2 parents f6a9c6c + fe4bf0a commit d21b267
Show file tree
Hide file tree
Showing 20 changed files with 381 additions and 457 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r requirements-dev.txt
- uses: isort/isort-action@master
- uses: isort/isort-action@v1.1.1
with:
sort-paths: antarest, tests
requirementsFiles: "requirements-dev.txt"
Expand Down Expand Up @@ -156,7 +156,7 @@ jobs:
with:
name: python-code-coverage-report
- name: SonarCloud Scan
uses: sonarsource/sonarcloud-github-action@master
uses: sonarsource/sonarcloud-github-action@v3.1.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
4 changes: 2 additions & 2 deletions antarest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

# Standard project metadata

__version__ = "2.18.0"
__version__ = "2.18.1"
__author__ = "RTE, Antares Web Team"
__date__ = "2024-11-29"
__date__ = "2024-12-02"
# noinspection SpellCheckingInspection
__credits__ = "(c) Réseau de Transport de l’Électricité (RTE)"

Expand Down
20 changes: 15 additions & 5 deletions antarest/eventbus/business/redis_eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

logger = logging.getLogger(__name__)
REDIS_STORE_KEY = "events"
MAX_EVENTS_LIST_SIZE = 1000


class RedisEventBus(IEventBusBackend):
Expand All @@ -41,14 +42,23 @@ def pull_queue(self, queue: str) -> Optional[Event]:
return None

def get_events(self) -> List[Event]:
messages = []
try:
event = self.pubsub.get_message(ignore_subscribe_messages=True)
if event is not None:
return [Event.parse_raw(event["data"])]
while msg := self.pubsub.get_message(ignore_subscribe_messages=True):
messages.append(msg)
if len(messages) >= MAX_EVENTS_LIST_SIZE:
break
except Exception:
logger.error("Failed to retrieve or parse event !", exc_info=True)
logger.error("Failed to retrieve events !", exc_info=True)

return []
events = []
for msg in messages:
try:
events.append(Event.model_validate_json(msg["data"]))
except Exception:
logger.error(f"Failed to parse event ! {msg}", exc_info=True)

return events

def clear_events(self) -> None:
# Nothing to do
Expand Down
18 changes: 14 additions & 4 deletions antarest/eventbus/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
logger = logging.getLogger(__name__)


EVENT_LOOP_REST_TIME = 0.2


class EventBusService(IEventBus):
def __init__(self, backend: IEventBusBackend, autostart: bool = True) -> None:
self.backend = backend
Expand Down Expand Up @@ -76,18 +79,22 @@ def remove_listener(self, listener_id: str) -> None:

async def _run_loop(self) -> None:
while True:
time.sleep(0.2)
try:
await self._on_events()
processed_events_count = await self._on_events()
# Give the loop some rest if it has nothing to do
if processed_events_count == 0:
await asyncio.sleep(EVENT_LOOP_REST_TIME)
except Exception as e:
logger.error("Unexpected error when processing events", exc_info=e)

async def _on_events(self) -> None:
async def _on_events(self) -> int:
processed_events_count = 0
with self.lock:
for queue in self.consumers:
if len(self.consumers[queue]) > 0:
event = self.backend.pull_queue(queue)
while event is not None:
processed_events_count += 1
try:
await list(self.consumers[queue].values())[
random.randint(0, len(self.consumers[queue]) - 1)
Expand All @@ -99,7 +106,9 @@ async def _on_events(self) -> None:
)
event = self.backend.pull_queue(queue)

for e in self.backend.get_events():
events = self.backend.get_events()
processed_events_count += len(events)
for e in events:
if e.type in self.listeners:
responses = await asyncio.gather(
*[
Expand All @@ -115,6 +124,7 @@ async def _on_events(self) -> None:
exc_info=res,
)
self.backend.clear_events()
return processed_events_count

def _async_loop(self, new_loop: bool = True) -> None:
loop = asyncio.new_event_loop() if new_loop else asyncio.get_event_loop()
Expand Down
127 changes: 32 additions & 95 deletions antarest/study/business/link_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,21 @@
# This file is part of the Antares project.

import typing as t
from typing import Any, Dict
from typing import Any

from antares.study.version import StudyVersion

from antarest.core.exceptions import ConfigFileNotFound, LinkNotFound, LinkValidationError
from antarest.core.exceptions import LinkNotFound
from antarest.core.model import JSON
from antarest.study.business.all_optional_meta import all_optional_model, camel_case_model
from antarest.study.business.model.link_model import LinkBaseDTO, LinkDTO, LinkInternal
from antarest.study.business.utils import execute_or_add_commands
from antarest.study.model import RawStudy, Study
from antarest.study.storage.rawstudy.model.filesystem.config.links import LinkProperties
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.storage_service import StudyStorageService
from antarest.study.storage.variantstudy.model.command.create_link import CreateLink
from antarest.study.storage.variantstudy.model.command.remove_link import RemoveLink
from antarest.study.storage.variantstudy.model.command.update_config import UpdateConfig
from antarest.study.storage.variantstudy.model.command.update_link import UpdateLink

_ALL_LINKS_PATH = "input/links"


@all_optional_model
@camel_case_model
class LinkOutput(LinkProperties):
"""
DTO object use to get the link information.
"""


class LinkManager:
def __init__(self, storage_service: StudyStorageService) -> None:
Expand All @@ -61,6 +48,17 @@ def get_all_links(self, study: Study) -> t.List[LinkDTO]:

return result

def get_link(self, study: RawStudy, link: LinkInternal) -> LinkInternal:
file_study = self.storage_service.get_storage(study).get_raw(study)

link_properties = self._get_link_if_exists(file_study, link)

link_properties.update({"area1": link.area1, "area2": link.area2})

updated_link = LinkInternal.model_validate(link_properties)

return updated_link

def create_link(self, study: Study, link_creation_dto: LinkDTO) -> LinkDTO:
link = link_creation_dto.to_internal(StudyVersion.parse(study.version))

Expand All @@ -80,16 +78,16 @@ def create_link(self, study: Study, link_creation_dto: LinkDTO) -> LinkDTO:
return link_creation_dto

def update_link(self, study: RawStudy, area_from: str, area_to: str, link_update_dto: LinkBaseDTO) -> LinkDTO:
link_dto = LinkDTO(area1=area_from, area2=area_to, **link_update_dto.model_dump())
link_dto = LinkDTO(area1=area_from, area2=area_to, **link_update_dto.model_dump(exclude_unset=True))

link = link_dto.to_internal(StudyVersion.parse(study.version))
file_study = self.storage_service.get_storage(study).get_raw(study)

self.get_link_if_exists(file_study, link)
self._get_link_if_exists(file_study, link)

command = UpdateLink(
area1=area_from,
area2=area_to,
area1=link.area1,
area2=link.area2,
parameters=link.model_dump(
include=link_update_dto.model_fields_set, exclude={"area1", "area2"}, exclude_none=True
),
Expand All @@ -99,25 +97,21 @@ def update_link(self, study: RawStudy, area_from: str, area_to: str, link_update

execute_or_add_commands(study, file_study, [command], self.storage_service)

updated_link = self.get_internal_link(study, link)
updated_link = self.get_link(study, link)

return updated_link.to_dto()

def get_link_if_exists(self, file_study: FileStudy, link: LinkInternal) -> dict[str, Any]:
try:
return file_study.tree.get(["input", "links", link.area1, "properties", link.area2])
except KeyError:
raise LinkNotFound(f"The link {link.area1} -> {link.area2} is not present in the study")

def get_internal_link(self, study: RawStudy, link: LinkInternal) -> LinkInternal:
file_study = self.storage_service.get_storage(study).get_raw(study)

link_properties = self.get_link_if_exists(file_study, link)

link_properties.update({"area1": link.area1, "area2": link.area2})
updated_link = LinkInternal.model_validate(link_properties)
def update_links(
self,
study: RawStudy,
update_links_by_ids: t.Mapping[t.Tuple[str, str], LinkBaseDTO],
) -> t.Mapping[t.Tuple[str, str], LinkBaseDTO]:
new_links_by_ids = {}
for (area1, area2), update_link_dto in update_links_by_ids.items():
updated_link = self.update_link(study, area1, area2, update_link_dto)
new_links_by_ids[(area1, area2)] = updated_link

return updated_link
return new_links_by_ids

def delete_link(self, study: RawStudy, area1_id: str, area2_id: str) -> None:
file_study = self.storage_service.get_storage(study).get_raw(study)
Expand All @@ -129,69 +123,12 @@ def delete_link(self, study: RawStudy, area1_id: str, area2_id: str) -> None:
)
execute_or_add_commands(study, file_study, [command], self.storage_service)

def get_all_links_props(self, study: RawStudy) -> t.Mapping[t.Tuple[str, str], LinkOutput]:
"""
Retrieves all links properties from the study.
Args:
study: The raw study object.
Returns:
A mapping of link IDS `(area1_id, area2_id)` to link properties.
Raises:
ConfigFileNotFound: if a configuration file is not found.
"""
file_study = self.storage_service.get_storage(study).get_raw(study)

# Get the link information from the `input/links/{area1}/properties.ini` file.
path = _ALL_LINKS_PATH
def _get_link_if_exists(self, file_study: FileStudy, link: LinkInternal) -> dict[str, Any]:
try:
links_cfg = file_study.tree.get(path.split("/"), depth=5)
return file_study.tree.get(["input", "links", link.area1, "properties", link.area2])
except KeyError:
raise ConfigFileNotFound(path) from None

# areas_cfg contains a dictionary where the keys are the area IDs,
# and the values are objects that can be converted to `LinkFolder`.
links_by_ids = {}
for area1_id, entries in links_cfg.items():
property_map = entries.get("properties") or {}
for area2_id, properties_cfg in property_map.items():
area1_id, area2_id = sorted([area1_id, area2_id])
properties = LinkProperties(**properties_cfg)
links_by_ids[(area1_id, area2_id)] = LinkOutput(**properties.model_dump(mode="json", by_alias=False))

return links_by_ids

def update_links_props(
self,
study: RawStudy,
update_links_by_ids: t.Mapping[t.Tuple[str, str], LinkOutput],
) -> t.Mapping[t.Tuple[str, str], LinkOutput]:
old_links_by_ids = self.get_all_links_props(study)
new_links_by_ids = {}
file_study = self.storage_service.get_storage(study).get_raw(study)
commands = []
for (area1, area2), update_link_dto in update_links_by_ids.items():
# Update the link properties.
old_link_dto = old_links_by_ids[(area1, area2)]
new_link_dto = old_link_dto.copy(
update=update_link_dto.model_dump(mode="json", by_alias=False, exclude_none=True)
)
new_links_by_ids[(area1, area2)] = new_link_dto

# Convert the DTO to a configuration object and update the configuration file.
properties = LinkProperties(**new_link_dto.model_dump(by_alias=False))
path = f"{_ALL_LINKS_PATH}/{area1}/properties/{area2}"
cmd = UpdateConfig(
target=path,
data=properties.to_config(),
command_context=self.storage_service.variant_study_service.command_factory.command_context,
study_version=file_study.config.version,
)
commands.append(cmd)

execute_or_add_commands(study, file_study, commands, self.storage_service)
return new_links_by_ids
raise LinkNotFound(f"The link {link.area1} -> {link.area2} is not present in the study")

@staticmethod
def get_table_schema() -> JSON:
return LinkOutput.schema()
return LinkBaseDTO.model_json_schema()
Loading

0 comments on commit d21b267

Please sign in to comment.