diff --git a/flood_adapt/api/tipping_points.py b/flood_adapt/api/tipping_points.py new file mode 100644 index 00000000..40902261 --- /dev/null +++ b/flood_adapt/api/tipping_points.py @@ -0,0 +1,43 @@ +from typing import Any + +from flood_adapt.dbs_controller import Database +from flood_adapt.object_model.interface.tipping_points import ITipPoint +from flood_adapt.object_model.tipping_point import TippingPoint + + +def get_tipping_points() -> dict[str, Any]: + # sorting and filtering either with PyQt table or in the API + return Database().tipping_points.list_objects() + + +def get_tipping_point(name: str) -> ITipPoint: + return Database().tipping_points.get(name) + + +def create_tipping_point(attrs: dict[str, Any]) -> ITipPoint: + return TippingPoint.load_dict(attrs) + + +def save_tipping_point(tipping_point: ITipPoint) -> None: + Database().tipping_points.save(tipping_point) + + +def edit_tipping_point(tipping_point: ITipPoint) -> None: + Database().tipping_points.edit(tipping_point) + + +def delete_tipping_point(name: str) -> None: + Database().tipping_points.delete(name) + + +def create_tipping_point_scenarios(name: str) -> None: + Database().tipping_points.get(name).create_tp_scenarios() + + +def run_tipping_point(name: str) -> None: + Database().tipping_points.get(name).run_tp_scenarios() + + +def check_scenario_has_run(name: str) -> None: + scenario_ran = Database().tipping_points.get(name).check_scenario_has_run() + return scenario_ran diff --git a/flood_adapt/dbs_classes/dbs_tipping_point.py b/flood_adapt/dbs_classes/dbs_tipping_point.py new file mode 100644 index 00000000..7c7ca661 --- /dev/null +++ b/flood_adapt/dbs_classes/dbs_tipping_point.py @@ -0,0 +1,71 @@ +import shutil + +from flood_adapt.dbs_classes.dbs_template import DbsTemplate +from flood_adapt.object_model.interface.tipping_points import ITipPoint +from flood_adapt.object_model.tipping_point import TippingPoint + + +class DbsTippingPoint(DbsTemplate): + _type = "tipping_point" + _folder_name = "tipping_points" + _object_model_class = TippingPoint + + def save(self, tipping_point: ITipPoint, overwrite: bool = False): + """Save a tipping point object in the database. + + Parameters + ---------- + tipping_point : ITipPoint + object of tipping point type + overwrite : bool, optional + whether to overwrite existing tipping point with same name, by default False + + Raises + ------ + ValueError + Raise error if name is already in use. Names of tipping points should be unique. + """ + # Save the tipping point + super().save(tipping_point, overwrite=overwrite) + + def delete(self, name: str, toml_only: bool = False): + """Delete an already existing tipping point in the database. + + Parameters + ---------- + name : str + name of the tipping point + toml_only : bool, optional + whether to only delete the toml file or the entire folder. If the folder is empty after deleting the toml, + it will always be deleted. By default False + """ + # First delete the tipping point + super().delete(name, toml_only=toml_only) + + # Delete output if edited + output_path = ( + self._database.tipping_points.get_database_path(get_input_path=False) / name + ) + + if output_path.exists(): + shutil.rmtree(output_path) + + def edit(self, tipping_point: ITipPoint): + """Edit an already existing tipping point in the database. + + Parameters + ---------- + tipping_point : ITipPoint + object of tipping point type + """ + # Edit the tipping point + super().edit(tipping_point) + + # Delete output if edited + output_path = ( + self._database.tipping_points.get_database_path(get_input_path=False) + / tipping_point.attrs.name + ) + + if output_path.exists(): + shutil.rmtree(output_path) diff --git a/flood_adapt/dbs_controller.py b/flood_adapt/dbs_controller.py index 94d5de05..33a12019 100644 --- a/flood_adapt/dbs_controller.py +++ b/flood_adapt/dbs_controller.py @@ -23,6 +23,7 @@ from flood_adapt.dbs_classes.dbs_scenario import DbsScenario from flood_adapt.dbs_classes.dbs_static import DbsStatic from flood_adapt.dbs_classes.dbs_strategy import DbsStrategy +from flood_adapt.dbs_classes.dbs_tipping_point import DbsTippingPoint from flood_adapt.integrator.sfincs_adapter import SfincsAdapter from flood_adapt.log import FloodAdaptLogging from flood_adapt.object_model.hazard.event.event_factory import EventFactory @@ -63,6 +64,7 @@ class Database(IDatabase): _measures: DbsMeasure _projections: DbsProjection _benefits: DbsBenefit + _tipping_points: DbsTippingPoint def __new__(cls, *args, **kwargs): if not cls._instance: # Singleton pattern @@ -133,6 +135,7 @@ def __init__( self._measures = DbsMeasure(self) self._projections = DbsProjection(self) self._benefits = DbsBenefit(self) + self._tipping_points = DbsTippingPoint(self) # Delete any unfinished/crashed scenario output self.cleanup() @@ -195,6 +198,10 @@ def projections(self) -> DbsProjection: def benefits(self) -> DbsBenefit: return self._benefits + @property + def tipping_points(self) -> DbsTippingPoint: + return self._tipping_points + def interp_slr(self, slr_scenario: str, year: float) -> float: r"""Interpolate SLR value and reference it to the SLR reference year from the site toml. @@ -750,6 +757,7 @@ def update(self) -> None: self.strategies = self._strategies.list_objects() self.scenarios = self._scenarios.list_objects() self.benefits = self._benefits.list_objects() + self.tipping_points = self._tipping_points.list_objects() def get_outputs(self) -> dict[str, Any]: """Return a dictionary with info on the outputs that currently exist in the database. diff --git a/flood_adapt/object_model/benefit.py b/flood_adapt/object_model/benefit.py index 8405cbfb..55386805 100644 --- a/flood_adapt/object_model/benefit.py +++ b/flood_adapt/object_model/benefit.py @@ -606,4 +606,4 @@ def save(self, filepath: Union[str, os.PathLike]): path for saving the toml file """ with open(filepath, "wb") as f: - tomli_w.dump(self.attrs.dict(exclude_none=True), f) + tomli_w.model_dump(self.attrs, f, exclude_none=True) diff --git a/flood_adapt/object_model/interface/tipping_points.py b/flood_adapt/object_model/interface/tipping_points.py new file mode 100644 index 00000000..0478944a --- /dev/null +++ b/flood_adapt/object_model/interface/tipping_points.py @@ -0,0 +1,102 @@ +import os +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any, Optional, Union + +import pandas as pd +from pydantic import BaseModel + + +class TippingPointMetrics(str, Enum): + """class describing the accepted input for the variable metric_type in TippingPoint.""" + + # based on what I have found in floodadapt - but can be changed + FloodedAll = "FloodedAll" + FloodedLowVulnerability = "FloodedLowVulnerability" + FloodedHighVulnerability = "FloodedHighVulnerability" + TotalDamageEvent = "TotalDamageEvent" + TotalResDamageEvent = "TotalResDamageEvent" + ResidentialMinorCount = "ResidentialMinorCount" + ResidentialMajorCount = "ResidentialMajorCount" + ResidentialDestroyedCount = "ResidentialDestroyedCount" + CommercialCount = "CommercialCount" + CommercialMinorCount = "CommercialMinorCount" + CommercialMajorCount = "CommercialMajorCount" + CommercialDestroyedCount = "CommercialDestroyedCount" + HealthCount = "HealthCount" + HealthMinorCount = "HealthMinorCount" + HealthMajorCount = "HealthMajorCount" + HealthDestroyedCount = "HealthDestroyedCount" + SchoolsCount = "SchoolsCount" + SchoolsMinorCount = "SchoolsMinorCount" + SchoolsMajorCount = "SchoolsMajorCount" + SchoolsDestroyedCount = "SchoolsDestroyedCount" + EmergencyCount = "EmergencyCount" + EmergencyMinorCount = "EmergencyMinorCount" + EmergencyMajorCount = "EmergencyMajorCount" + EmergencyDestroyedCount = "EmergencyDestroyedCount" + DisplacedLowVulnerability = "DisplacedLowVulnerability" + DisplacedHighVulnerability = "DisplacedHighVulnerability" + SlightlyFloodedRoads = "SlightlyFloodedRoads" + MinorFloodedRoads = "MinorFloodedRoads" + MajorFloodedRoads = "MajorFloodedRoads" + FullyFloodedRoads = "FullyFloodedRoads" + + +class TippingPointStatus(str, Enum): + """class describing the accepted input for the variable metric_type in TippingPoint.""" + + reached = "reached" + not_reached = "not_reached" + completed = "completed" + + +class TippingPointOperator(str, Enum): + """class describing the accepted input for the variable operator in TippingPoint.""" + + greater = "greater" + less = "less" + + +class TippingPointModel(BaseModel): + """BaseModel describing the expected variables and data types of a Tipping Point analysis object.""" + + name: str + description: Optional[str] = "" + strategy: str + event_set: str + projection: str + sealevelrise: list[float] # could be a numpy array too + tipping_point_metric: list[tuple[TippingPointMetrics, float, TippingPointOperator]] + status: TippingPointStatus = TippingPointStatus.not_reached + scenarios: list[str] = [] + + +class ITipPoint(ABC): + attrs: TippingPointModel + database_input_path: Union[str, os.PathLike] + results_path: Union[str, os.PathLike] + scenarios: pd.DataFrame + has_run: bool = False + + @staticmethod + @abstractmethod + def load_file(filepath: Union[str, os.PathLike]): + """Get Tipping Point attributes from toml file.""" + ... + + @staticmethod # copping from benefits.py + @abstractmethod + def load_dict(data: dict[str, Any]): + """Get Tipping Point attributes from an object, e.g. when initialized from GUI.""" + ... + + @abstractmethod + def save(self, filepath: Union[str, os.PathLike]): + """Save Tipping Point attributes to a toml file.""" + ... + + @abstractmethod + def check_scenarios_exist(self) -> pd.DataFrame: + """Check which scenarios are needed for this tipping point calculation and if they have already been created.""" + ... diff --git a/flood_adapt/object_model/scenario.py b/flood_adapt/object_model/scenario.py index c29fa4f3..495acf31 100644 --- a/flood_adapt/object_model/scenario.py +++ b/flood_adapt/object_model/scenario.py @@ -62,7 +62,7 @@ def load_dict(data: dict[str, Any], database_input_path: os.PathLike): def save(self, filepath: Union[str, os.PathLike]): """Save Scenario to a toml file.""" with open(filepath, "wb") as f: - tomli_w.dump(self.attrs.dict(exclude_none=True), f) + tomli_w.dump(self.attrs.model_dump(exclude_none=True), f) def run(self): """Run direct impact models for the scenario.""" diff --git a/flood_adapt/object_model/tipping_point.py b/flood_adapt/object_model/tipping_point.py new file mode 100644 index 00000000..5c0a7a98 --- /dev/null +++ b/flood_adapt/object_model/tipping_point.py @@ -0,0 +1,395 @@ +import os +from pathlib import Path +from typing import Union + +import numpy as np +import pandas as pd +import plotly.graph_objects as go +import tomli +import tomli_w +from scipy.interpolate import interp1d + +from flood_adapt.object_model.interface.tipping_points import ( + ITipPoint, + TippingPointModel, + TippingPointStatus, +) +from flood_adapt.object_model.io.unitfulvalue import UnitfulLength, UnitTypesLength +from flood_adapt.object_model.scenario import Scenario + + +def ensure_database_loaded(): + """Ensure that the Database class is available without circular issues.""" + try: + Database() + except NameError: + from flood_adapt.dbs_controller import Database + return Database() + + +class TippingPoint(ITipPoint): + """Class holding all information related to tipping points analysis.""" + + def __init__(self): + self.database = ensure_database_loaded() + self.site_toml_path = Path(self.database.static_path) / "site" / "site.toml" + self.results_path = self.database.output_path / "tipping_points" + self.scenarios = {} + + def slr_projections(self, slr): + """Create projections for sea level rise value.""" + new_projection_name = ( + self.attrs.projection + "_tp_slr" + str(slr).replace(".", "") + ) + proj = self.database.projections.get(self.attrs.projection) + proj.attrs.physical_projection.sea_level_rise = UnitfulLength( + value=slr, units=UnitTypesLength.meters + ) + proj.attrs.name = new_projection_name + if proj.attrs.name not in self.database.projections.list_objects()["name"]: + self.database.projections.save(proj) + return self + + def check_scenarios_exist(self, scenario_obj, existing_scenarios): + # check if the current scenario in the tipping point object already exists in the database, if not save it + for db_scenario in existing_scenarios: + if scenario_obj == db_scenario: + return db_scenario + self.database.scenarios.save(scenario_obj) + return scenario_obj + + def create_tp_scenarios(self): + """Create scenarios for each sea level rise value inside the tipping_point folder.""" + for i, slr in enumerate(self.attrs.sealevelrise): + self.slr_projections(slr) + self.attrs.sealevelrise[i] = str(slr).replace(".", "") + + scenarios = { + f"slr_{slr}": { + "name": f'{self.attrs.projection}_tp_slr{str(slr).replace(".", "")}_{self.attrs.event_set}_{self.attrs.strategy}', + "event": self.attrs.event_set, + "projection": f'{self.attrs.projection}_tp_slr{str(slr).replace(".", "")}', + "strategy": self.attrs.strategy, + } + for slr in self.attrs.sealevelrise + } + + existing_scenarios = self.database.scenarios.list_objects()["objects"] + + for scenario in scenarios.keys(): + scenario_obj = Scenario.load_dict( + scenarios[scenario], self.database.input_path + ) + resulting_scenario = self.check_scenarios_exist( + scenario_obj, existing_scenarios + ) + + self.scenarios[resulting_scenario.attrs.name] = { + "name": resulting_scenario.attrs.name, + "description": resulting_scenario.attrs.description, + "event": resulting_scenario.attrs.event, + "projection": resulting_scenario.attrs.projection, + "strategy": resulting_scenario.attrs.strategy, + "object": resulting_scenario, + } + + self.attrs.scenarios = list(self.scenarios.keys()) + print("All scenarios checked and created successfully.") + + def run_tp_scenarios(self): + # TODO: add more strict if clause below + """Run all scenarios to determine tipping points.""" + if not self.scenarios: + self.create_tp_scenarios() + + for name, scenario in self.scenarios.items(): + scenario_obj = scenario["object"] + # IMPORTANT: commented out to run every scenario - uncomment if you want to skip scenarios once tipping point is reached (decision pending on which direction to pursue) + # if self.attrs.status == TippingPointStatus.reached: + # self.scenarios[name]["tipping point reached"] = True + # continue + + if not self.scenario_has_run(scenario_obj): + scenario_obj.run() + self.has_run = True + + if self.check_tipping_point(scenario_obj): + self.attrs.status = TippingPointStatus.reached + self.scenarios[name]["tipping point reached"] = True + else: + self.scenarios[name]["tipping point reached"] = False + + self.prepare_tp_results() + print("All scenarios run successfully.") + + self._make_html() + + def scenario_has_run(self, scenario_obj): + # TODO: once has_run is refactored (external) we change below to make it more direct + for db_scenario, finished in zip( + self.database.scenarios.list_objects()["objects"], + self.database.scenarios.list_objects()["finished"], + ): + if scenario_obj == db_scenario and finished: + return True + return False + + def check_scenario_has_run(self): + if not self.scenarios: + self.create_tp_scenarios() + for name, scenario in self.scenarios.items(): + scenario_obj = scenario["object"] + if self.scenario_has_run(scenario_obj): + return True + + def check_tipping_point(self, scenario: Scenario): + """Load results and check if the tipping point is reached.""" + info_df = pd.read_csv( + scenario.init_object_model().direct_impacts.results_path.joinpath( + f"Infometrics_{scenario.direct_impacts.name}.csv" + ), + index_col=0, + ) + + for metric in self.attrs.tipping_point_metric: + self.scenarios[scenario.attrs.name][f"{metric[0]}_value"] = info_df.loc[ + metric[0], "Value" + ] + + # TODO: maybe change to a different approach if more than one tipping + # point is being assessed (instead of any, maybe you want to check + # which TPs are reached and return a dict with the results) + return any( + self.evaluate_tipping_point( + info_df.loc[metric[0], "Value"], + metric[1], + metric[2], + ) + for metric in self.attrs.tipping_point_metric + ) + + def evaluate_tipping_point(self, current_value, threshold, operator): + """Compare current value with threshold for tipping point.""" + operations = {"greater": lambda x, y: x >= y, "less": lambda x, y: x <= y} + return operations[operator](current_value, threshold) + + def calculate_sea_level_at_threshold(self, tp_results): + tp_results["ATP"] = "False" + for metric in self.attrs.tipping_point_metric: + metric_name, threshold, operator = metric + valid_data = tp_results[tp_results["Metric"] == f"{metric_name}"].dropna() + + x = valid_data["sea level"] + y = valid_data["Value"] + + interpolation_function = interp1d(y, x, fill_value="extrapolate") + + # Check if the interpolated value is reasonable (not -inf, inf, NaN, etc.) + if np.isfinite(interpolation_function(threshold)): + new_rows = pd.DataFrame( + { + "sea level": interpolation_function(threshold), + "strategy": valid_data.iloc[0]["strategy"], + "Metric": metric_name.value, + "Value": threshold, + "ATP": "True", + }, + index=[0], + ) + tp_results = pd.concat([tp_results, new_rows], ignore_index=True) + tp_results = tp_results.sort_values(by=["Metric", "sea level"]) + return tp_results + + def prepare_tp_results(self): + tp_path = self.results_path.joinpath(self.attrs.name) + if not tp_path.is_dir(): + tp_path.mkdir(parents=True) + + tp_results = pd.DataFrame.from_dict(self.scenarios, orient="index").reset_index( + drop=True + ) + + tp_results["sea level"] = [ + float(i) / 10 for i in self.attrs.sealevelrise + ] # TODO: fix later if needed - quick solution dividing by 10 + + tp_results["strategy"] = self.attrs.strategy + + tp_results_long = pd.melt( + tp_results, + id_vars=["sea level", "strategy"], + value_vars=[col for col in tp_results.columns if col.endswith("_value")], + var_name="Metric", + value_name="Value", + ) + + tp_results_long["Metric"] = tp_results_long["Metric"].str.replace("_value", "") + tp_results_long = self.calculate_sea_level_at_threshold(tp_results_long) + tp_results_long.to_csv(tp_path / "tipping_point_results.csv") + + def _make_html(self): # Make html + tp_path = self.results_path.joinpath(self.attrs.name) + tp_results = pd.read_csv(tp_path / "tipping_point_results.csv") + + # divide calculated values from interpolated ones + tp_slr_min = int(self.attrs.sealevelrise[0]) / 10 + tp_slr_max = int(self.attrs.sealevelrise[-1]) / 10 + + # plot tp + for metric in self.attrs.tipping_point_metric: + metric_data = tp_results[tp_results["Metric"] == metric[0]] + + # Get topping point + idx_tp = metric_data[metric_data["ATP"]].index[0] + tp = metric_data["sea level"].iloc[idx_tp] + tp_value = metric_data["Value"].iloc[idx_tp] + + # Get non-interpolated data + idx_min = metric_data[metric_data["sea level"] == tp_slr_min].index[0] + idx_max = metric_data[metric_data["sea level"] == tp_slr_max].index[0] + fig = go.Figure() + + # First trace: plotting the data between idx_min and idx_max + fig.add_trace( + go.Scatter( + x=metric_data.loc[idx_min:idx_max, "sea level"], + y=metric_data.loc[idx_min:idx_max, "Value"], + mode="lines+markers", + name=f"{metric[0]}", + ) + ) + + # Second trace: plotting data from idx_max + 1 to the end + fig.add_trace( + go.Scatter( + x=metric_data.loc[idx_max + 1 :, "sea level"], + y=metric_data.loc[idx_max + 1 :, "Value"], + mode="lines+markers", + name="Extrapolated", + line={"color": "Green", "dash": "dash"}, + ) + ) + # Third trace: plotting data from idx_max + 1 to the end + fig.add_trace( + go.Scatter( + x=metric_data.loc[idx_min - 1 : idx_min, "sea level"], + y=metric_data.loc[idx_min - 1 : idx_min, "Value"], + mode="lines+markers", + name="Extrapolated", + line={"color": "Green", "dash": "dot"}, + ) + ) + + # Adding a horizontal line shape for the threshold + fig.add_trace( + go.Scatter( + x=[ + metric_data["sea level"].min(), + metric_data["sea level"].max(), + ], + y=[metric[1], metric[1]], + mode="lines", + name="Threshold", + line={"color": "Red", "width": 3, "dash": "dash"}, + ) + ) + + # Adding a horizontal line shape for the threshold + fig.add_trace( + go.Scatter( + x=[tp], + y=[tp_value], + mode="markers", + name=f"Tipping point: {tp.round(3)} m", + marker={ + "color": "Red", + "size": 10, + }, + ) + ) + + # Updating layout with titles + fig.update_layout( + title=f"Tipping Point Analysis for {self.attrs.name}", + xaxis_title="Sea Level Rise (m)", + yaxis_title=f"{metric[0]}", + ) + + # write html to results folder + html = os.path.join(tp_path, "tipping_point.html") + fig.write_html(html) + + @staticmethod + def load_file(filepath: Union[str, Path]) -> "TippingPoint": + """Create risk event from toml file.""" + with open(filepath, mode="rb") as fp: + toml = tomli.load(fp) + return TippingPoint.load_dict(toml) + + @staticmethod + def load_dict(dct: Union[str, Path]) -> "TippingPoint": + """Create risk event from toml file.""" + obj = TippingPoint() + obj.attrs = TippingPointModel.model_validate(dct) + return obj + + def save(self, filepath: Union[str, os.PathLike]): + """Save tipping point to a toml file.""" + with open(filepath, "wb") as f: + tomli_w.dump(self.attrs.model_dump(exclude_none=True), f) + + def __eq__(self, other): + if not isinstance(other, TippingPoint): + # don't attempt to compare against unrelated types + raise NotImplementedError + attrs_1, attrs_2 = self.attrs.model_copy(), other.attrs.model_copy() + attrs_1.__delattr__("name"), attrs_2.__delattr__("name") + attrs_1.__delattr__("description"), attrs_2.__delattr__("description") + return attrs_1 == attrs_2 + + +def load_database(database_path: str, database_name: str, system_folder: str): + # Call the read_database function with the provided path and name + from flood_adapt.api.static import read_database + from flood_adapt.config import Settings + + # Validate and set environment variables + Settings( + database_root=database_path, + database_name=database_name, + system_folder=system_folder, + ) + database = read_database(database_path, database_name) + + return database + + +# I am keeping this for quick access to debug until review is done. Then we delete it. +if __name__ == "__main__": + system_folder = "C:\\Users\\morenodu\\FloodAdapt\\flood_adapt\\system" + database_path = "C:\\Users\\morenodu\\OneDrive - Stichting Deltares\\Documents\\GitHub\\Database" + database_name = "charleston_test" + + # Load the database + database = load_database(database_path, database_name, system_folder) + + tp_dict = { + "name": "tipping_point_test", + "description": "", + "event_set": "extreme12ft", + "strategy": "no_measures", + "projection": "current", + "sealevelrise": [0.5, 1.0, 1.5, 2], + "tipping_point_metric": [ + ("TotalDamageEvent", 130074525.0, "greater"), + ("DisplacedHighVulnerability", 900, "greater"), + ], + } + # load + test_point = TippingPoint.load_dict(tp_dict) + # create scenarios for tipping points + test_point.create_tp_scenarios() + # run all scenarios + test_point.run_tp_scenarios() + # plot results + test_point._make_html() diff --git a/pyproject.toml b/pyproject.toml index ec020dcf..a07b3547 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -144,3 +144,7 @@ files.ignore-hidden = true [tool.typos.default.extend-words] strat = "strat" +waring = "waring" +guitares = "guitares" +datas = "datas" +Stichting = "Stichting" diff --git a/tests/test_object_model/test_tipping_points.py b/tests/test_object_model/test_tipping_points.py new file mode 100644 index 00000000..0528cc0e --- /dev/null +++ b/tests/test_object_model/test_tipping_points.py @@ -0,0 +1,127 @@ +from pathlib import Path + +import pytest + +from flood_adapt.dbs_controller import Database +from flood_adapt.object_model.tipping_point import TippingPoint + + +class TestTippingPoints: + @pytest.fixture() + def tp_dict(self): + return { + "name": "tipping_point_test", + "description": "", + "event_set": "extreme12ft", + "strategy": "no_measures", + "projection": "current", + "sealevelrise": [0.5, 0.75, 1.0, 1.5], + "tipping_point_metric": [ + ("TotalDamageEvent", 110974525.0, "greater"), + ("FullyFloodedRoads", 2305, "greater"), + ], + } + + @pytest.fixture() + def created_tp_scenarios(self, tp_dict): + test_point = TippingPoint.load_dict(tp_dict) + test_point.create_tp_scenarios() + return test_point + + @pytest.fixture() + def run_tp_scenarios(self, created_tp_scenarios): + created_tp_scenarios.run_tp_scenarios() + return created_tp_scenarios + + def test_createTippingPoints_scenariosAlreadyExist_notDuplicated( + self, test_db, tp_dict + ): + test_point = TippingPoint.load_dict(tp_dict) + test_point.create_tp_scenarios() + assert test_point is not None + assert isinstance(test_point, TippingPoint) + + def test_run_scenarios(self, test_db, created_tp_scenarios): + created_tp_scenarios.run_tp_scenarios() + assert created_tp_scenarios is not None + + def test_slr_projections_creation(self, test_db, tp_dict): + test_point = TippingPoint.load_dict(tp_dict) + for slr in test_point.attrs.sealevelrise: + test_point.slr_projections(slr) + projection_path = ( + Path(Database().input_path) + / "projections" + / f"{test_point.attrs.projection}_slr{str(slr).replace('.', '')}" + / f"{test_point.attrs.projection}_slr{str(slr).replace('.', '')}.toml" + ) + assert projection_path.exists() + + def test_scenario_tippingpoint_reached(self, test_db, run_tp_scenarios): + for name, scenario in run_tp_scenarios.scenarios.items(): + assert ( + "tipping point reached" in scenario + ), f"Key 'tipping point reached' not found in scenario: {name}" + assert isinstance( + scenario["tipping point reached"], bool + ), f"Value for 'tipping point reached' is not boolean in scenario: {name}" + + +# TODO create test for tipping point reached being true and another for false + +# TODO: check if the tipping point reached is indeed correct + + +class TestTippingPointInvalidInputs: + @pytest.mark.parametrize( + "invalid_tp_dict", + [ + # Missing required fields + {}, + {"name": "missing_other_fields"}, + # Incorrect data types + { + "name": 123, + "description": 456, + "event_set": "extreme12ft", + "strategy": "no_measures", + "projection": "current", + "sealevelrise": "not_a_list", + "tipping_point_metric": "not_a_list", + }, + # Invalid values + { + "name": "", + "description": "", + "event_set": "unknown_event", + "strategy": "no_measures", + "projection": "future", + "sealevelrise": [-1, -2], + "tipping_point_metric": [ + ("TotalDamageEvent", "not_a_number", "greater") + ], + }, + ], + ) + def test_load_dict_with_invalid_inputs(self, invalid_tp_dict): + with pytest.raises(ValueError): + TippingPoint.load_dict(invalid_tp_dict) + + def test_edge_cases_empty_sealevelrise(self, test_db): + tp_dict = { + "name": "tipping_point_test", + "description": "", + "event_set": "extreme12ft", + "strategy": "no_measures", + "projection": "current", + "sealevelrise": [], + "tipping_point_metric": [ + ("TotalDamageEvent", 110974525.0, "greater"), + ("DisplacedHighVulnerability", 900, "greater"), + ], + } + test_point = TippingPoint.load_dict(tp_dict) + test_point.create_tp_scenarios() + assert ( + len(test_point.scenarios) == 0 + ), "Scenarios should not be created for empty sealevelrise list"