diff --git a/antarest/core/exceptions.py b/antarest/core/exceptions.py index cf1b21d1a4..66859234fc 100644 --- a/antarest/core/exceptions.py +++ b/antarest/core/exceptions.py @@ -325,6 +325,35 @@ def __init__(self, is_variant: bool) -> None: super().__init__(HTTPStatus.EXPECTATION_FAILED, "Upgrade not supported for parent of variants") +class ReferencedObjectDeletionNotAllowed(HTTPException): + """ + Exception raised when a binding constraint is not allowed to be deleted because it references + other objects: areas, links or thermal clusters. + """ + + def __init__(self, object_id: str, binding_ids: t.Sequence[str], *, object_type: str) -> None: + """ + Initialize the exception. + + Args: + object_id: ID of the object that is not allowed to be deleted. + binding_ids: Binding constraints IDs that reference the object. + object_type: Type of the object that is not allowed to be deleted: area, link or thermal cluster. + """ + max_count = 10 + first_bcs_ids = ",\n".join(f"{i}- '{bc}'" for i, bc in enumerate(binding_ids[:max_count], 1)) + and_more = f",\nand {len(binding_ids) - max_count} more..." if len(binding_ids) > max_count else "." + message = ( + f"{object_type} '{object_id}' is not allowed to be deleted, because it is referenced" + f" in the following binding constraints:\n{first_bcs_ids}{and_more}" + ) + super().__init__(HTTPStatus.FORBIDDEN, message) + + def __str__(self) -> str: + """Return a string representation of the exception.""" + return self.detail + + class UnsupportedStudyVersion(HTTPException): def __init__(self, version: str) -> None: super().__init__( diff --git a/antarest/study/business/binding_constraint_management.py b/antarest/study/business/binding_constraint_management.py index 28881ef874..7f42bb7f59 100644 --- a/antarest/study/business/binding_constraint_management.py +++ b/antarest/study/business/binding_constraint_management.py @@ -501,13 +501,12 @@ def terms_to_coeffs(terms: t.Sequence[ConstraintTerm]) -> t.Dict[str, t.List[flo :return: A dictionary of term IDs mapped to a list of their coefficients. """ coeffs = {} - if terms is not None: - for term in terms: - if term.id and term.weight is not None: - coeffs[term.id] = [term.weight] - if term.offset: - coeffs[term.id].append(term.offset) - return coeffs + for term in terms: + if term.id and term.weight is not None: + coeffs[term.id] = [term.weight] + if term.offset: + coeffs[term.id].append(term.offset) + return coeffs def get_binding_constraint(self, study: Study, bc_id: str) -> ConstraintOutput: """ diff --git a/antarest/study/model.py b/antarest/study/model.py index ad3d1f0fb4..4eff8109ab 100644 --- a/antarest/study/model.py +++ b/antarest/study/model.py @@ -426,13 +426,37 @@ class ExportFormat(str, enum.Enum): TAR_GZ = "application/tar+gz" JSON = "application/json" - @staticmethod - def from_dto(data: str) -> "ExportFormat": - if data == "application/zip": - return ExportFormat.ZIP - if data == "application/tar+gz": - return ExportFormat.TAR_GZ - return ExportFormat.JSON + @classmethod + def from_dto(cls, accept_header: str) -> "ExportFormat": + """ + Convert the "Accept" header to the corresponding content type. + + Args: + accept_header: Value of the "Accept" header. + + Returns: + The corresponding content type: ZIP, TAR_GZ or JSON. + By default, JSON is returned if the format is not recognized. + For instance, if the "Accept" header is "*/*", JSON is returned. + """ + mapping = { + "application/zip": ExportFormat.ZIP, + "application/tar+gz": ExportFormat.TAR_GZ, + "application/json": ExportFormat.JSON, + } + return mapping.get(accept_header, ExportFormat.JSON) + + @property + def suffix(self) -> str: + """ + Returns the file suffix associated with the format: ".zip", ".tar.gz" or ".json". + """ + mapping = { + ExportFormat.ZIP: ".zip", + ExportFormat.TAR_GZ: ".tar.gz", + ExportFormat.JSON: ".json", + } + return mapping[self] class StudyDownloadDTO(BaseModel): diff --git a/antarest/study/service.py b/antarest/study/service.py index 0330992da2..494e7d2f2f 100644 --- a/antarest/study/service.py +++ b/antarest/study/service.py @@ -24,6 +24,7 @@ CommandApplicationError, IncorrectPathError, NotAManagedStudyException, + ReferencedObjectDeletionNotAllowed, StudyDeletionNotAllowed, StudyNotFoundError, StudyTypeUnsupported, @@ -56,7 +57,7 @@ from antarest.study.business.areas.renewable_management import RenewableManager from antarest.study.business.areas.st_storage_management import STStorageManager from antarest.study.business.areas.thermal_management import ThermalManager -from antarest.study.business.binding_constraint_management import BindingConstraintManager +from antarest.study.business.binding_constraint_management import BindingConstraintManager, ConstraintFilters, LinkTerm from antarest.study.business.config_management import ConfigManager from antarest.study.business.correlation_management import CorrelationManager from antarest.study.business.district_manager import DistrictManager @@ -686,7 +687,7 @@ def create_study( id=sid, name=study_name, workspace=DEFAULT_WORKSPACE_NAME, - path=study_path, + path=str(study_path), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), version=version or NEW_DEFAULT_STUDY_VERSION, @@ -1187,13 +1188,13 @@ def download_outputs( """ Download outputs Args: - study_id: study Id - output_id: output id - data: Json parameters - use_task: use task or not - filetype: type of returning file, - tmp_export_file: temporary file (if use_task is false), - params: request parameters + study_id: study ID. + output_id: output ID. + data: Json parameters. + use_task: use task or not. + filetype: type of returning file,. + tmp_export_file: temporary file (if `use_task` is false),. + params: request parameters. Returns: CSV content file @@ -1202,35 +1203,33 @@ def download_outputs( study = self.get_study(study_id) assert_permission(params.user, study, StudyPermissionType.READ) self._assert_study_unarchived(study) - logger.info( - f"Study {study_id} output download asked by {params.get_user_id()}", - ) + logger.info(f"Study {study_id} output download asked by {params.get_user_id()}") if use_task: logger.info(f"Exporting {output_id} from study {study_id}") export_name = f"Study filtered output {study.name}/{output_id} export" export_file_download = self.file_transfer_manager.request_download( - f"{study.name}-{study_id}-{output_id}_filtered.{'tar.gz' if filetype == ExportFormat.TAR_GZ else 'zip'}", + f"{study.name}-{study_id}-{output_id}_filtered{filetype.suffix}", export_name, params.user, ) export_path = Path(export_file_download.path) export_id = export_file_download.id - def export_task(notifier: TaskUpdateNotifier) -> TaskResult: + def export_task(_notifier: TaskUpdateNotifier) -> TaskResult: try: - study = self.get_study(study_id) - stopwatch = StopWatch() - matrix = StudyDownloader.build( - self.storage_service.get_storage(study).get_raw(study), + _study = self.get_study(study_id) + _stopwatch = StopWatch() + _matrix = StudyDownloader.build( + self.storage_service.get_storage(_study).get_raw(_study), output_id, data, ) - stopwatch.log_elapsed( + _stopwatch.log_elapsed( lambda x: logger.info(f"Study {study_id} filtered output {output_id} built in {x}s") ) - StudyDownloader.export(matrix, filetype, export_path) - stopwatch.log_elapsed( + StudyDownloader.export(_matrix, filetype, export_path) + _stopwatch.log_elapsed( lambda x: logger.info(f"Study {study_id} filtered output {output_id} exported in {x}s") ) self.file_transfer_manager.set_ready(export_id) @@ -1240,7 +1239,7 @@ def export_task(notifier: TaskUpdateNotifier) -> TaskResult: ) except Exception as e: self.file_transfer_manager.fail(export_id, str(e)) - raise e + raise task_id = self.task_service.add_task( export_task, @@ -1265,17 +1264,18 @@ def export_task(notifier: TaskUpdateNotifier) -> TaskResult: stopwatch.log_elapsed( lambda x: logger.info(f"Study {study_id} filtered output {output_id} exported in {x}s") ) - return FileResponse( - tmp_export_file, - headers=( - {"Content-Disposition": "inline"} - if filetype == ExportFormat.JSON - else { - "Content-Disposition": f'attachment; filename="output-{output_id}.{"tar.gz" if filetype == ExportFormat.TAR_GZ else "zip"}' - } - ), - media_type=filetype, - ) + + if filetype == ExportFormat.JSON: + headers = {"Content-Disposition": "inline"} + elif filetype == ExportFormat.TAR_GZ: + headers = {"Content-Disposition": f'attachment; filename="output-{output_id}.tar.gz'} + elif filetype == ExportFormat.ZIP: + headers = {"Content-Disposition": f'attachment; filename="output-{output_id}.zip'} + else: # pragma: no cover + raise NotImplementedError(f"Export format {filetype} is not supported") + + return FileResponse(tmp_export_file, headers=headers, media_type=filetype) + else: json_response = json.dumps( matrix.dict(), @@ -1314,26 +1314,20 @@ def set_sim_reference( params: RequestParameters, ) -> None: """ - Set simulation as the reference output + Set simulation as the reference output. + Args: - study_id: study Id - output_id: output id - status: state of the reference status + study_id: study ID. + output_id: The ID of the output to set as reference. + status: state of the reference status. params: request parameters - - Returns: None - """ study = self.get_study(study_id) assert_permission(params.user, study, StudyPermissionType.WRITE) self._assert_study_unarchived(study) logger.info( - "output %s set by user %s as reference (%b) for study %s", - output_id, - params.get_user_id(), - status, - study_id, + f"output {output_id} set by user {params.get_user_id()} as reference ({status}) for study {study_id}" ) self.storage_service.get_storage(study).set_reference_output(study, output_id, status) @@ -1855,9 +1849,27 @@ def update_thermal_cluster_metadata( return self.areas.update_thermal_cluster_metadata(study, area_id, clusters_metadata) def delete_area(self, uuid: str, area_id: str, params: RequestParameters) -> None: + """ + Delete area from study if it is not referenced by a binding constraint, + otherwise raise an HTTP 403 Forbidden error. + + Args: + uuid: The study ID. + area_id: The area ID to delete. + params: The request parameters used to check user permissions. + + Raises: + ReferencedObjectDeletionNotAllowed: If the area is referenced by a binding constraint. + """ study = self.get_study(uuid) assert_permission(params.user, study, StudyPermissionType.WRITE) self._assert_study_unarchived(study) + referencing_binding_constraints = self.binding_constraint_manager.get_binding_constraints( + study, ConstraintFilters(area_name=area_id) + ) + if referencing_binding_constraints: + binding_ids = [bc.id for bc in referencing_binding_constraints] + raise ReferencedObjectDeletionNotAllowed(area_id, binding_ids, object_type="Area") self.areas.delete_area(study, area_id) self.event_bus.push( Event( @@ -1874,9 +1886,29 @@ def delete_link( area_to: str, params: RequestParameters, ) -> None: + """ + Delete link from study if it is not referenced by a binding constraint, + otherwise raise an HTTP 403 Forbidden error. + + Args: + uuid: The study ID. + area_from: The area from which the link starts. + area_to: The area to which the link ends. + params: The request parameters used to check user permissions. + + Raises: + ReferencedObjectDeletionNotAllowed: If the link is referenced by a binding constraint. + """ study = self.get_study(uuid) assert_permission(params.user, study, StudyPermissionType.WRITE) self._assert_study_unarchived(study) + link_id = LinkTerm(area1=area_from, area2=area_to).generate_id() + referencing_binding_constraints = self.binding_constraint_manager.get_binding_constraints( + study, ConstraintFilters(link_id=link_id) + ) + if referencing_binding_constraints: + binding_ids = [bc.id for bc in referencing_binding_constraints] + raise ReferencedObjectDeletionNotAllowed(link_id, binding_ids, object_type="Link") self.links.delete_link(study, area_from, area_to) self.event_bus.push( Event( @@ -2518,3 +2550,26 @@ def get_matrix_with_index_and_header( ) return df_matrix + + def asserts_no_thermal_in_binding_constraints( + self, study: Study, area_id: str, cluster_ids: t.Sequence[str] + ) -> None: + """ + Check that no cluster is referenced in a binding constraint, otherwise raise an HTTP 403 Forbidden error. + + Args: + study: input study for which an update is to be committed + area_id: area ID to be checked + cluster_ids: IDs of the thermal clusters to be checked + + Raises: + ReferencedObjectDeletionNotAllowed: if a cluster is referenced in a binding constraint + """ + + for cluster_id in cluster_ids: + ref_bcs = self.binding_constraint_manager.get_binding_constraints( + study, ConstraintFilters(cluster_id=f"{area_id}.{cluster_id}") + ) + if ref_bcs: + binding_ids = [bc.id for bc in ref_bcs] + raise ReferencedObjectDeletionNotAllowed(cluster_id, binding_ids, object_type="Cluster") diff --git a/antarest/study/web/study_data_blueprint.py b/antarest/study/web/study_data_blueprint.py index e84029ff8f..43d71400d5 100644 --- a/antarest/study/web/study_data_blueprint.py +++ b/antarest/study/web/study_data_blueprint.py @@ -2185,6 +2185,7 @@ def delete_thermal_clusters( ) request_params = RequestParameters(user=current_user) study = study_service.check_study_access(uuid, StudyPermissionType.WRITE, request_params) + study_service.asserts_no_thermal_in_binding_constraints(study, area_id, cluster_ids) study_service.thermal_manager.delete_clusters(study, area_id, cluster_ids) @bp.get( diff --git a/tests/core/test_exceptions.py b/tests/core/test_exceptions.py new file mode 100644 index 0000000000..86892187a0 --- /dev/null +++ b/tests/core/test_exceptions.py @@ -0,0 +1,25 @@ +from antarest.core.exceptions import ReferencedObjectDeletionNotAllowed + + +class TestReferencedObjectDeletionNotAllowed: + def test_few_binding_constraints(self) -> None: + object_id = "france" + binding_ids = ["bc1", "bc2"] + object_type = "Area" + exception = ReferencedObjectDeletionNotAllowed(object_id, binding_ids, object_type=object_type) + message = str(exception) + assert f"{object_type} '{object_id}'" in message + assert "bc1" in message + assert "bc2" in message + assert "more..." not in message + + def test_many_binding_constraints(self) -> None: + object_id = "france" + binding_ids = [f"bc{i}" for i in range(1, 50)] + object_type = "Area" + exception = ReferencedObjectDeletionNotAllowed(object_id, binding_ids, object_type=object_type) + message = str(exception) + assert f"{object_type} '{object_id}'" in message + assert "bc1" in message + assert "bc2" in message + assert "more..." in message diff --git a/tests/integration/study_data_blueprint/test_thermal.py b/tests/integration/study_data_blueprint/test_thermal.py index e3f62eca1e..c294d0c235 100644 --- a/tests/integration/study_data_blueprint/test_thermal.py +++ b/tests/integration/study_data_blueprint/test_thermal.py @@ -646,7 +646,26 @@ def test_lifecycle( ) assert res.status_code in {200, 201}, res.json() - # To delete a thermal cluster, we need to provide its ID. + # verify that we can't delete the thermal cluster because it is referenced in a binding constraint + res = client.request( + "DELETE", + f"/v1/studies/{study_id}/areas/{area_id}/clusters/thermal", + headers={"Authorization": f"Bearer {user_access_token}"}, + json=[fr_gas_conventional_id], + ) + assert res.status_code == 403, res.json() + description = res.json()["description"] + assert all([elm in description for elm in [fr_gas_conventional, "binding constraint"]]) + assert res.json()["exception"] == "ReferencedObjectDeletionNotAllowed" + + # delete the binding constraint + res = client.delete( + f"/v1/studies/{study_id}/bindingconstraints/{bc_obj['name']}", + headers={"Authorization": f"Bearer {user_access_token}"}, + ) + assert res.status_code == 200, res.json() + + # Now we can delete the thermal cluster res = client.request( "DELETE", f"/v1/studies/{study_id}/areas/{area_id}/clusters/thermal", @@ -654,9 +673,8 @@ def test_lifecycle( json=[fr_gas_conventional_id], ) assert res.status_code == 204, res.json() - assert res.text in {"", "null"} # Old FastAPI versions return 'null'. - # When we delete a thermal cluster, we should also delete the binding constraints that reference it. + # check that the binding constraint has been deleted # noinspection SpellCheckingInspection res = client.get( f"/v1/studies/{study_id}/bindingconstraints", @@ -1029,3 +1047,171 @@ def test_variant_lifecycle(self, client: TestClient, user_access_token: str, var "replace_matrix", "remove_cluster", ] + + def test_thermal_cluster_deletion(self, client: TestClient, user_access_token: str, study_id: str) -> None: + """ + Test that creating a thermal cluster with invalid properties raises a validation error. + """ + + client.headers = {"Authorization": f"Bearer {user_access_token}"} + + # Create an area "area_1" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={ + "name": "area_1", + "type": "AREA", + "metadata": {"country": "FR"}, + }, + ) + assert res.status_code == 200, res.json() + + # Create an area "area_2" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={ + "name": "area_2", + "type": "AREA", + "metadata": {"country": "DE"}, + }, + ) + assert res.status_code == 200, res.json() + + # Create an area "area_3" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={ + "name": "area_3", + "type": "AREA", + "metadata": {"country": "ES"}, + }, + ) + assert res.status_code == 200, res.json() + + # Create a thermal cluster in the study for area_1 + res = client.post( + f"/v1/studies/{study_id}/areas/area_1/clusters/thermal", + json={ + "name": "cluster_1", + "group": "Nuclear", + "unitCount": 13, + "nominalCapacity": 42500, + "marginalCost": 0.1, + }, + ) + assert res.status_code == 200, res.json() + + # Create a thermal cluster in the study for area_2 + res = client.post( + f"/v1/studies/{study_id}/areas/area_2/clusters/thermal", + json={ + "name": "cluster_2", + "group": "Nuclear", + "unitCount": 13, + "nominalCapacity": 42500, + "marginalCost": 0.1, + }, + ) + assert res.status_code == 200, res.json() + + # Create a thermal cluster in the study for area_3 + res = client.post( + f"/v1/studies/{study_id}/areas/area_3/clusters/thermal", + json={ + "name": "cluster_3", + "group": "Nuclear", + "unitCount": 13, + "nominalCapacity": 42500, + "marginalCost": 0.1, + }, + ) + assert res.status_code == 200, res.json() + + # add a binding constraint that references the thermal cluster in area_1 + bc_obj = { + "name": "bc_1", + "enabled": True, + "time_step": "hourly", + "operator": "less", + "terms": [ + { + "id": "area_1.cluster_1", + "weight": 2, + "offset": 5, + "data": {"area": "area_1", "cluster": "cluster_1"}, + } + ], + } + res = client.post( + f"/v1/studies/{study_id}/bindingconstraints", + json=bc_obj, + ) + assert res.status_code == 200, res.json() + + # add a binding constraint that references the thermal cluster in area_2 + bc_obj = { + "name": "bc_2", + "enabled": True, + "time_step": "hourly", + "operator": "less", + "terms": [ + { + "id": "area_2.cluster_2", + "weight": 2, + "offset": 5, + "data": {"area": "area_2", "cluster": "cluster_2"}, + } + ], + } + res = client.post( + f"/v1/studies/{study_id}/bindingconstraints", + json=bc_obj, + ) + assert res.status_code == 200, res.json() + + # check that deleting the thermal cluster in area_1 fails + res = client.delete( + f"/v1/studies/{study_id}/areas/area_1/clusters/thermal", + json=["cluster_1"], + ) + assert res.status_code == 403, res.json() + + # now delete the binding constraint that references the thermal cluster in area_1 + res = client.delete( + f"/v1/studies/{study_id}/bindingconstraints/bc_1", + ) + assert res.status_code == 200, res.json() + + # check that deleting the thermal cluster in area_1 succeeds + res = client.delete( + f"/v1/studies/{study_id}/areas/area_1/clusters/thermal", + json=["cluster_1"], + ) + assert res.status_code == 204, res.json() + + # check that deleting the thermal cluster in area_2 fails + res = client.delete( + f"/v1/studies/{study_id}/areas/area_2/clusters/thermal", + json=["cluster_2"], + ) + assert res.status_code == 403, res.json() + + # now delete the binding constraint that references the thermal cluster in area_2 + res = client.delete( + f"/v1/studies/{study_id}/bindingconstraints/bc_2", + ) + assert res.status_code == 200, res.json() + + # check that deleting the thermal cluster in area_2 succeeds + res = client.delete( + f"/v1/studies/{study_id}/areas/area_2/clusters/thermal", + json=["cluster_2"], + ) + assert res.status_code == 204, res.json() + + # check that deleting the thermal cluster in area_3 succeeds + res = client.delete( + f"/v1/studies/{study_id}/areas/area_3/clusters/thermal", + json=["cluster_3"], + ) + assert res.status_code == 204, res.json() diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index 55e182c7d3..fa0d4d418c 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -1427,8 +1427,20 @@ def test_area_management(client: TestClient, admin_access_token: str) -> None: }, } + # check that at this stage the area cannot be deleted as it is referenced in binding constraint 1 result = client.delete(f"/v1/studies/{study_id}/areas/area%201") - assert result.status_code == 200 + assert result.status_code == 403, res.json() + # verify the error message + description = result.json()["description"] + assert all([elm in description for elm in ["area 1", "binding constraint 1"]]) + # check the exception + assert result.json()["exception"] == "ReferencedObjectDeletionNotAllowed" + + # delete binding constraint 1 + client.delete(f"/v1/studies/{study_id}/bindingconstraints/binding%20constraint%201") + # check now that we can delete the area 1 + result = client.delete(f"/v1/studies/{study_id}/areas/area%201") + assert result.status_code == 200, res.json() res_areas = client.get(f"/v1/studies/{study_id}/areas") assert res_areas.json() == [ { @@ -1701,3 +1713,171 @@ def test_copy(client: TestClient, admin_access_token: str, study_id: str) -> Non res = client.get(f"/v1/studies/{copied.json()}").json() assert res["groups"] == [] assert res["public_mode"] == "READ" + + +def test_areas_deletion_with_binding_constraints(client: TestClient, user_access_token: str, study_id: str) -> None: + """ + Test the deletion of areas that are referenced in binding constraints. + """ + + # set client headers to user access token + client.headers = {"Authorization": f"Bearer {user_access_token}"} + + area1_id = "france" + area2_id = "germany" + cluster_id = "nuclear power plant" + + constraint_terms = [ + { + # Link between two areas + "data": {"area1": area1_id, "area2": area2_id}, + "id": f"{area1_id}%{area2_id}", + "offset": 2, + "weight": 1.0, + }, + { + # Cluster in an area + "data": {"area": area1_id, "cluster": cluster_id.lower()}, + "id": f"{area1_id}.{cluster_id.lower()}", + "offset": 2, + "weight": 1.0, + }, + ] + + for constraint_term in constraint_terms: + # Create an area "area_1" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={"name": area1_id.title(), "type": "AREA", "metadata": {"country": "FR"}}, + ) + res.raise_for_status() + + if set(constraint_term["data"]) == {"area1", "area2"}: + # Create a second area and a link between the two areas + res = client.post( + f"/v1/studies/{study_id}/areas", + json={"name": area2_id.title(), "type": "AREA", "metadata": {"country": "DE"}}, + ) + res.raise_for_status() + res = client.post( + f"/v1/studies/{study_id}/links", + json={"area1": area1_id, "area2": area2_id}, + ) + res.raise_for_status() + + elif set(constraint_term["data"]) == {"area", "cluster"}: + # Create a cluster in the first area + res = client.post( + f"/v1/studies/{study_id}/areas/{area1_id}/clusters/thermal", + json={"name": cluster_id.title(), "group": "Nuclear"}, + ) + res.raise_for_status() + + else: + raise NotImplementedError(f"Unsupported constraint term: {constraint_term}") + + # create a binding constraint that references the link + bc_id = "bc_1" + bc_obj = { + "name": bc_id, + "enabled": True, + "time_step": "daily", + "operator": "less", + "terms": [constraint_term], + } + res = client.post(f"/v1/studies/{study_id}/bindingconstraints", json=bc_obj) + res.raise_for_status() + + if set(constraint_term["data"]) == {"area1", "area2"}: + areas_to_delete = [area1_id, area2_id] + elif set(constraint_term["data"]) == {"area", "cluster"}: + areas_to_delete = [area1_id] + else: + raise NotImplementedError(f"Unsupported constraint term: {constraint_term}") + + for area_id in areas_to_delete: + # try to delete the areas + res = client.delete(f"/v1/studies/{study_id}/areas/{area_id}") + assert res.status_code == 403, res.json() + description = res.json()["description"] + assert all([elm in description for elm in [area_id, bc_id]]) + assert res.json()["exception"] == "ReferencedObjectDeletionNotAllowed" + + # delete the binding constraint + res = client.delete(f"/v1/studies/{study_id}/bindingconstraints/{bc_id}") + assert res.status_code == 200, res.json() + + for area_id in areas_to_delete: + # delete the area + res = client.delete(f"/v1/studies/{study_id}/areas/{area_id}") + assert res.status_code == 200, res.json() + + +def test_links_deletion_with_binding_constraints(client: TestClient, user_access_token: str, study_id: str) -> None: + """ + Test the deletion of links that are referenced in binding constraints. + """ + + # set client headers to user access token + client.headers = {"Authorization": f"Bearer {user_access_token}"} + + # Create an area "area_1" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={ + "name": "area_1", + "type": "AREA", + "metadata": {"country": "FR"}, + }, + ) + assert res.status_code == 200, res.json() + + # Create an area "area_2" in the study + res = client.post( + f"/v1/studies/{study_id}/areas", + json={ + "name": "area_2", + "type": "AREA", + "metadata": {"country": "DE"}, + }, + ) + assert res.status_code == 200, res.json() + + # create a link between the two areas + res = client.post( + f"/v1/studies/{study_id}/links", + json={"area1": "area_1", "area2": "area_2"}, + ) + assert res.status_code == 200, res.json() + + # create a binding constraint that references the link + bc_obj = { + "name": "bc_1", + "enabled": True, + "time_step": "hourly", + "operator": "less", + "terms": [ + { + "id": "area_1%area_2", + "weight": 2, + "data": {"area1": "area_1", "area2": "area_2"}, + } + ], + } + res = client.post(f"/v1/studies/{study_id}/bindingconstraints", json=bc_obj) + assert res.status_code == 200, res.json() + + # try to delete the link before deleting the binding constraint + res = client.delete(f"/v1/studies/{study_id}/links/area_1/area_2") + assert res.status_code == 403, res.json() + description = res.json()["description"] + assert all([elm in description for elm in ["area_1%area_2", "bc_1"]]) + assert res.json()["exception"] == "ReferencedObjectDeletionNotAllowed" + + # delete the binding constraint + res = client.delete(f"/v1/studies/{study_id}/bindingconstraints/bc_1") + assert res.status_code == 200, res.json() + + # delete the link + res = client.delete(f"/v1/studies/{study_id}/links/area_1/area_2") + assert res.status_code == 200, res.json()