Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bc): correct case sensitivity for binding constraint term IDs #1903

Merged
2 changes: 1 addition & 1 deletion antarest/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.BAD_REQUEST, message)


class NoBindingConstraintError(HTTPException):
class BindingConstraintNotFoundError(HTTPException):
def __init__(self, message: str) -> None:
super().__init__(HTTPStatus.NOT_FOUND, message)

Expand Down
127 changes: 82 additions & 45 deletions antarest/study/business/binding_constraint_management.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel
from pydantic import BaseModel, validator

from antarest.core.exceptions import (
BindingConstraintNotFoundError,
ConstraintAlreadyExistError,
ConstraintIdNotFoundError,
DuplicateConstraintName,
InvalidConstraintName,
MissingDataError,
NoBindingConstraintError,
NoConstraintError,
)
from antarest.matrixstore.model import MatrixData
Expand All @@ -29,21 +29,72 @@
from antarest.study.storage.variantstudy.model.command.update_binding_constraint import UpdateBindingConstraint


class LinkInfoDTO(BaseModel):
class AreaLinkDTO(BaseModel):
"""
DTO for a constraint term on a link between two areas.

Attributes:
area1: the first area ID
area2: the second area ID
"""

area1: str
area2: str

def generate_id(self) -> str:
"""Return the constraint term ID for this link, of the form "area1%area2"."""
# Ensure IDs are in alphabetical order and lower case
ids = sorted((self.area1.lower(), self.area2.lower()))
return "%".join(ids)


class AreaClusterDTO(BaseModel):
"""
DTO for a constraint term on a cluster in an area.

Attributes:
area: the area ID
cluster: the cluster ID
"""

class ClusterInfoDTO(BaseModel):
area: str
cluster: str

def generate_id(self) -> str:
"""Return the constraint term ID for this Area/cluster constraint, of the form "area.cluster"."""
# Ensure IDs are in lower case
ids = [self.area.lower(), self.cluster.lower()]
return ".".join(ids)


class ConstraintTermDTO(BaseModel):
"""
DTO for a constraint term.

Attributes:
id: the constraint term ID, of the form "area1%area2" or "area.cluster".
weight: the constraint term weight, if any.
offset: the constraint term offset, if any.
data: the constraint term data (link or cluster), if any.
"""

id: Optional[str]
weight: Optional[float]
offset: Optional[float]
data: Optional[Union[LinkInfoDTO, ClusterInfoDTO]]
data: Optional[Union[AreaLinkDTO, AreaClusterDTO]]

@validator("id")
def id_to_lower(cls, v: Optional[str]) -> Optional[str]:
"""Ensure the ID is lower case."""
if v is None:
return None
return v.lower()

def generate_id(self) -> str:
"""Return the constraint term ID for this term based on its data."""
if self.data is None:
return self.id or ""
return self.data.generate_id()


class UpdateBindingConstProps(BaseModel):
Expand Down Expand Up @@ -97,12 +148,12 @@ def parse_constraint(key: str, value: str, char: str, new_config: BindingConstra
id=key,
weight=weight,
offset=offset if offset is not None else None,
data=LinkInfoDTO(
data=AreaLinkDTO(
area1=value1,
area2=value2,
)
if char == "%"
else ClusterInfoDTO(
else AreaClusterDTO(
area=value1,
cluster=value2,
),
Expand Down Expand Up @@ -208,7 +259,7 @@ def update_binding_constraint(
file_study = self.storage_service.get_storage(study).get_raw(study)
constraint = self.get_binding_constraint(study, binding_constraint_id)
if not isinstance(constraint, BindingConstraintDTO):
raise NoBindingConstraintError(study.id)
raise BindingConstraintNotFoundError(study.id)

if data.key == "time_step" and data.value != constraint.time_step:
# The user changed the time step, we need to update the matrix accordingly
Expand Down Expand Up @@ -243,16 +294,6 @@ def find_constraint_term_id(constraints_term: List[ConstraintTermDTO], constrain
except ValueError:
return -1

@staticmethod
def get_constraint_id(data: Union[LinkInfoDTO, ClusterInfoDTO]) -> str:
if isinstance(data, ClusterInfoDTO):
constraint_id = f"{data.area}.{data.cluster}"
else:
area1 = data.area1 if data.area1 < data.area2 else data.area2
area2 = data.area2 if area1 == data.area1 else data.area1
constraint_id = f"{area1}%{area2}"
return constraint_id

def add_new_constraint_term(
self,
study: Study,
Expand All @@ -262,12 +303,12 @@ def add_new_constraint_term(
file_study = self.storage_service.get_storage(study).get_raw(study)
constraint = self.get_binding_constraint(study, binding_constraint_id)
if not isinstance(constraint, BindingConstraintDTO):
raise NoBindingConstraintError(study.id)
raise BindingConstraintNotFoundError(study.id)

if constraint_term.data is None:
raise MissingDataError("Add new constraint term : data is missing")

constraint_id = BindingConstraintManager.get_constraint_id(constraint_term.data)
constraint_id = constraint_term.data.generate_id()
constraints_term = constraint.constraints or []
if BindingConstraintManager.find_constraint_term_id(constraints_term, constraint_id) >= 0:
raise ConstraintAlreadyExistError(study.id)
Expand Down Expand Up @@ -304,45 +345,40 @@ def update_constraint_term(
self,
study: Study,
binding_constraint_id: str,
data: Union[ConstraintTermDTO, str],
term: Union[ConstraintTermDTO, str],
) -> None:
file_study = self.storage_service.get_storage(study).get_raw(study)
constraint = self.get_binding_constraint(study, binding_constraint_id)

if not isinstance(constraint, BindingConstraintDTO):
raise NoBindingConstraintError(study.id)
raise BindingConstraintNotFoundError(study.id)

constraints = constraint.constraints
if constraints is None:
constraint_terms = constraint.constraints # existing constraint terms
if constraint_terms is None:
raise NoConstraintError(study.id)

data_id = data.id if isinstance(data, ConstraintTermDTO) else data
if data_id is None:
term_id = term.id if isinstance(term, ConstraintTermDTO) else term
if term_id is None:
raise ConstraintIdNotFoundError(study.id)

data_term_index = BindingConstraintManager.find_constraint_term_id(constraints, data_id)
if data_term_index < 0:
term_id_index = BindingConstraintManager.find_constraint_term_id(constraint_terms, term_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En fait, ici le code est moche car on fait une recherche d'index dans une liste. Tout serait plus simple si on avait un dict

constraint_map = {term.id: term for term in constraint_terms}
...

if term_id in constraint_map:
    ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

je suis d'accord, c'est mieux, mon but ici n'était pas de faire un gros refactoring. je veux bien m'en occuper quand on fera une refonte du front

if term_id_index < 0:
raise ConstraintIdNotFoundError(study.id)

if isinstance(data, ConstraintTermDTO):
constraint_id = BindingConstraintManager.get_constraint_id(data.data) if data.data is not None else data_id
current_constraint = constraints[data_term_index]
constraints.append(
ConstraintTermDTO(
id=constraint_id,
weight=data.weight if data.weight is not None else current_constraint.weight,
offset=data.offset,
data=data.data if data.data is not None else current_constraint.data,
)
if isinstance(term, ConstraintTermDTO):
updated_term_id = term.data.generate_id() if term.data else term_id
current_constraint = constraint_terms[term_id_index]

constraint_terms[term_id_index] = ConstraintTermDTO(
id=updated_term_id,
weight=term.weight or current_constraint.weight,
offset=term.offset,
data=term.data or current_constraint.data,
)
del constraints[data_term_index]
else:
del constraints[data_term_index]
del constraint_terms[term_id_index]

coeffs = {}
for term in constraints:
coeffs[term.id] = [term.weight]
if term.offset is not None:
coeffs[term.id].append(term.offset)
coeffs = {term.id: [term.weight, term.offset] if term.offset else [term.weight] for term in constraint_terms}

command = UpdateBindingConstraint(
id=constraint.id,
Expand All @@ -358,6 +394,7 @@ def update_constraint_term(
)
execute_or_add_commands(study, file_study, [command], self.storage_service)

# FIXME create a dedicated delete service
def remove_constraint_term(
self,
study: Study,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def apply_binding_constraint(
# Cluster IDs are stored in lower case in the binding constraints file.
area, cluster_id = link_or_cluster.split(".")
thermal_ids = {thermal.id.lower() for thermal in study_data.config.areas[area].thermals}
laurent-laporte-pro marked this conversation as resolved.
Show resolved Hide resolved
if area not in study_data.config.areas or cluster_id not in thermal_ids:
if area not in study_data.config.areas or cluster_id.lower() not in thermal_ids:
return CommandOutput(
status=False,
message=f"Cluster '{link_or_cluster}' does not exist in binding constraint '{bd_id}'",
Expand Down
22 changes: 11 additions & 11 deletions antarest/study/web/study_data_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ def get_binding_constraint_list(
@bp.get(
"/studies/{uuid}/bindingconstraints/{binding_constraint_id}",
tags=[APITag.study_data],
summary="Get binding constraint list",
summary="Get binding constraint",
hdinia marked this conversation as resolved.
Show resolved Hide resolved
response_model=None, # Dict[str, bool],
)
def get_binding_constraint(
Expand Down Expand Up @@ -929,45 +929,45 @@ def create_binding_constraint(
@bp.post(
"/studies/{uuid}/bindingconstraints/{binding_constraint_id}/term",
tags=[APITag.study_data],
summary="update constraint term",
summary="Create a binding constraint term",
)
def add_constraint_term(
uuid: str,
binding_constraint_id: str,
data: ConstraintTermDTO,
term: ConstraintTermDTO,
laurent-laporte-pro marked this conversation as resolved.
Show resolved Hide resolved
current_user: JWTUser = Depends(auth.get_current_user),
) -> Any:
logger.info(
f"add constraint term from {binding_constraint_id} for study {uuid}",
f"Add constraint term {term.id} to {binding_constraint_id} for study {uuid}",
extra={"user": current_user.id},
)
params = RequestParameters(user=current_user)
study = study_service.check_study_access(uuid, StudyPermissionType.WRITE, params)
return study_service.binding_constraint_manager.add_new_constraint_term(study, binding_constraint_id, data)
return study_service.binding_constraint_manager.add_new_constraint_term(study, binding_constraint_id, term)

@bp.put(
"/studies/{uuid}/bindingconstraints/{binding_constraint_id}/term",
tags=[APITag.study_data],
summary="update constraint term",
summary="Update a binding constraint term",
)
def update_constraint_term(
uuid: str,
binding_constraint_id: str,
data: ConstraintTermDTO,
term: ConstraintTermDTO,
hdinia marked this conversation as resolved.
Show resolved Hide resolved
current_user: JWTUser = Depends(auth.get_current_user),
) -> Any:
logger.info(
f"update constraint term from {binding_constraint_id} for study {uuid}",
f"Update constraint term {term.id} from {binding_constraint_id} for study {uuid}",
extra={"user": current_user.id},
)
params = RequestParameters(user=current_user)
study = study_service.check_study_access(uuid, StudyPermissionType.WRITE, params)
return study_service.binding_constraint_manager.update_constraint_term(study, binding_constraint_id, data)
return study_service.binding_constraint_manager.update_constraint_term(study, binding_constraint_id, term)

@bp.delete(
"/studies/{uuid}/bindingconstraints/{binding_constraint_id}/term/{term_id}",
tags=[APITag.study_data],
summary="update constraint term",
summary="Remove a binding constraint term",
)
def remove_constraint_term(
uuid: str,
Expand All @@ -976,7 +976,7 @@ def remove_constraint_term(
current_user: JWTUser = Depends(auth.get_current_user),
) -> None:
logger.info(
f"delete constraint term {term_id} from {binding_constraint_id} for study {uuid}",
f"Remove constraint term {term_id} from {binding_constraint_id} for study {uuid}",
extra={"user": current_user.id},
)
params = RequestParameters(user=current_user)
Expand Down
Loading
Loading