Skip to content

Commit

Permalink
Fix/512 read year from config for control risk calculation in cut fro…
Browse files Browse the repository at this point in the history
…m year (#514)

* chore: renaming representative_damage_percentile

* chore: year is read from config, default arguments are deleted

* test: test is updated where year is not input

* 513 resolve_zero_division_in_triangle_to_null (#516)

* chore: if year is 0 then turned it into 1.

* feat: function created to aggregate link damages and risk

* chore: extra comment lines removed. (previous scipt part replaced with a function)

* feat: to_integrate_shaper_factory is created. hz_to_integrate_shaper is completed. osd_to_integrate_shaper under construction

* feat: osd_to_integrate_shaper is completed.

* feat: man_to_integrate_shaper is completed.

* chore: pattern added to find risk_<vulnerability_curve>

* chore: skip adding np.nan of the segment_value to the link_value

* chore: risk_result_columns was referenced before being defined

* chore: tests updated to refer to the correct risk column

* chore: _rps = list(_to_integrate.columns) assigned once
  • Loading branch information
sahand-asgarpour authored Jul 12, 2024
1 parent 88a4350 commit 66d1961
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from ra2ce.analysis.damages.damage_calculation.damage_network_base import (
DamageNetworkBase,
)
from ra2ce.analysis.damages.shape_to_integrate_object.to_integrate_shaper_factory import (
ToIntegrateShaperFactory,
)


class DamageNetworkReturnPeriods(DamageNetworkBase):
Expand Down Expand Up @@ -93,99 +96,105 @@ def main(self, damage_function: DamageCurveEnum, manual_damage_functions):

if damage_function == DamageCurveEnum.MAN:
self.calculate_damage_manual_functions(
events=self.events, manual_damage_functions=manual_damage_functions
events=self.return_periods,
manual_damage_functions=manual_damage_functions,
)

def control_risk_calculation(
self,
mode: RiskCalculationModeEnum = RiskCalculationModeEnum.DEFAULT,
year: int = 0,
damage_function: DamageCurveEnum,
mode: RiskCalculationModeEnum,
year: int,
):
"""
Controler of the risk calculation, which calls the correct risk (integration) functions
Controller of the risk calculation, which calls the correct risk (integration) functions
Arguments:
*damage_function* (DamageCurveEnum) : defines the damage estimation method
*mode* (RiskCalculationModeEnum) : the sort of risk calculation that you want to do, can be:
‘default’, 'cut_from_YYYY_year’, ‘triangle_to_null_YYYY_year’
*year* (int) : the cutoff year/return period of the risk calculation
:param damage_function:
"""
self.verify_damage_data_for_risk_calculation()

# prepare the parameters of the risk calculation
dam_cols = [c for c in self.gdf.columns if c.startswith("dam")]
_to_integrate = self.gdf[dam_cols]
_to_integrate.columns = [
float(c.split("_")[1].replace("RP", "")) for c in _to_integrate.columns
]
_to_integrate = _to_integrate.sort_index(
axis="columns", ascending=False
) # from large to small RP

if mode == RiskCalculationModeEnum.DEFAULT:
to_integrate_shaper = ToIntegrateShaperFactory.get_shaper(
gdf=self.gdf, damage_function=damage_function
)
return_periods = to_integrate_shaper.get_return_periods()
_to_integrate_dict = to_integrate_shaper.shape_to_integrate_object(
return_periods=return_periods
)

_to_integrate = self.rework_damage_data_default(_to_integrate)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())
self.gdf["risk"] = _risk
for vulnerability_curve_name, _to_integrate in _to_integrate_dict.items():
if mode == RiskCalculationModeEnum.DEFAULT:
_to_integrate = self.rework_damage_data_default(_to_integrate)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())

elif mode == RiskCalculationModeEnum.CUT_FROM_YEAR:
"""
In this mode, the integration mimics the presence of a flood protection
"""
_rps = list(_to_integrate.columns)

if year <= min(_rps):
raise ValueError(
"""
RA2CE cannot calculate risk in 'cut_from' mode if
Return period of the cutoff ({}) <= smallest available return period ({})
Use 'default' mode or 'triangle_to_null_mode' instead.
""".format(
year, min(_rps)
if mode == RiskCalculationModeEnum.CUT_FROM_YEAR:
"""
In this mode, the integration mimics the presence of a flood protection
"""

if year <= min(_rps):
raise ValueError(
"""
RA2CE cannot calculate risk in 'cut_from' mode if
Return period of the cutoff ({}) <= smallest available return period ({})
Use 'default' mode or 'triangle_to_null_mode' instead.
""".format(
year, min(_rps)
)
)
)

elif (
min(_rps) < year < max(_rps)
): # if protection level is between min and max RP
_to_integrate = self.rework_damage_data_cut_from(_to_integrate, year)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())

elif year >= max(
_rps
): # cutoff is larger or equal than largest return period
# risk is return frequency of cutoff
# times the damage of the most extreme event
_to_integrate = _to_integrate.fillna(0)
_risk = _to_integrate[_rps[0]] / year
# _max_RP = max(_to_integrate.columns)

self.gdf["risk"] = _risk

elif mode == RiskCalculationModeEnum.TRIANGLE_TO_NULL_YEAR:
"""
In this mode, an extra data point with zero damage is added at some distance from the smallest known RP,
and the area of the Triangle this creates is also calculated
"""

_rps = list(_to_integrate.columns)
elif (
min(_rps) < year < max(_rps)
): # if protection level is between min and max RP
_to_integrate = self.rework_damage_data_cut_from(
_to_integrate, year
)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())

elif year >= max(
_rps
): # cutoff is larger or equal than largest return period
# risk is return frequency of cutoff
# times the damage of the most extreme event
_to_integrate = _to_integrate.fillna(0)
_risk = _to_integrate[max(_to_integrate.columns)] / year

elif mode == RiskCalculationModeEnum.TRIANGLE_TO_NULL_YEAR:
"""
In this mode, an extra data point with zero damage is added at some distance from the smallest known RP,
and the area of the Triangle this creates is also calculated
"""

if year >= min(_rps) and year != 0:
raise ValueError(
"""
RA2CE cannot calculate risk in 'triangle_to_null' mode if
Return period of the triangle ({}) >= smallest available return period ({})
Use 'default' mode or 'cut_from' instead.
""".format(
year, min(_rps)
)
)

if year >= min(_rps):
raise ValueError(
"""
RA2CE cannot calculate risk in 'triangle_to_null' mode if
Return period of the triangle ({}) >= smallest available return period ({})
Use 'default' mode or 'cut_from' instead.
""".format(
year, min(_rps)
if year == 0:
logging.warning(
"Available lane data cannot simply be converted to float/int, RA2CE will try a clean-up."
)
)
year = 1

_to_integrate = self.rework_damage_data_triangle_to_null(
_to_integrate, year
)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())
_to_integrate = self.rework_damage_data_triangle_to_null(
_to_integrate, year
)
_risk = self.integrate_df_trapezoidal(_to_integrate.copy())

self.gdf["risk"] = _risk
self.gdf[f"risk_{vulnerability_curve_name}"] = _risk

def verify_damage_data_for_risk_calculation(self):
"""
Expand Down
4 changes: 3 additions & 1 deletion ra2ce/analysis/damages/damages.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ def _rename_road_gdf_to_conventions(road_gdf_columns: list[str]) -> list[str]:
): # Check if risk_calculation is demanded
if self.analysis.risk_calculation_mode != RiskCalculationModeEnum.NONE:
return_period_gdf.control_risk_calculation(
mode=self.analysis.risk_calculation_mode
self.analysis.damage_curve,
mode=self.analysis.risk_calculation_mode,
year=self.analysis.risk_calculation_year,
)

else:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import geopandas as gpd

from ra2ce.analysis.analysis_config_data.enums.damage_curve_enum import DamageCurveEnum
from ra2ce.analysis.damages.shape_to_integrate_object.to_Integrate_shaper_protocol import (
ToIntegrateShaperProtocol,
)


class HzToIntegrateShaper(ToIntegrateShaperProtocol):
gdf: gpd.GeoDataFrame

def __init__(self, gdf):
self.gdf = gdf

def get_return_periods(self) -> list:
return sorted(c for c in self.gdf.columns if c.startswith("dam"))

def shape_to_integrate_object(
self, return_periods: list
) -> dict[str : gpd.GeoDataFrame]:
_to_integrate = self.gdf[return_periods]

_to_integrate.columns = [
float(c.split("_")[1].replace("RP", "")) for c in _to_integrate.columns
]
return {
DamageCurveEnum.HZ.name: _to_integrate.sort_index(
axis="columns", ascending=False
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import geopandas as gpd
import pandas as pd
import re

from ra2ce.analysis.damages.shape_to_integrate_object.to_Integrate_shaper_protocol import (
ToIntegrateShaperProtocol,
)


class ManToIntegrateShaper(ToIntegrateShaperProtocol):
gdf: gpd.GeoDataFrame

def __init__(self, gdf):
self.gdf = gdf

@staticmethod
def _extract_columns_by_pattern(
pattern_text: str, gdf: gpd.GeoDataFrame
) -> set[str]:
"""
Extract column names based on the provided regex pattern.
Args:
pattern_text (Pattern[str]): The compiled regex pattern to match the column names.
df (pd.DataFrame): The DataFrame from which to extract the RP values.
Returns:
Set[str]: A set of RP values extracted from the column names.
"""
pattern = re.compile(pattern_text)
columns = {pattern.search(c).group(1) for c in gdf.columns if pattern.search(c)}
return columns

def get_return_periods(self) -> list:
# Extract the RP values from the columns
rp_values = ManToIntegrateShaper._extract_columns_by_pattern(
pattern_text=r"RP(\d+)", gdf=self.gdf
)
if not rp_values:
raise ValueError("No damage column with RP found")

return sorted([float(rp) for rp in rp_values])

def shape_to_integrate_object(
self, return_periods: list
) -> dict[str : gpd.GeoDataFrame]:
_to_integrate_dict = {}
# finds vulnerability curves shown with Ci, where 1<= i <=6
vulnerability_curves = sorted(
ManToIntegrateShaper._extract_columns_by_pattern(
pattern_text=r"dam_RP\d+_(.*)", gdf=self.gdf
)
)
for vulnerability_curve in vulnerability_curves:

filtered_columns = sorted(
ManToIntegrateShaper._extract_columns_by_pattern(
pattern_text=rf"(dam_RP\d+.*{vulnerability_curve})",
gdf=self.gdf,
)
)
_to_integrate = self.gdf[filtered_columns]

_to_integrate.columns = [
float(c.split("_")[1].replace("RP", "")) for c in _to_integrate.columns
]
_to_integrate_dict[f"{vulnerability_curve}"] = _to_integrate.sort_index(
axis="columns", ascending=False
)

return _to_integrate_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import geopandas as gpd
import pandas as pd
import re

from ra2ce.analysis.damages.shape_to_integrate_object.to_Integrate_shaper_protocol import (
ToIntegrateShaperProtocol,
)


class OsdToIntegrateShaper(ToIntegrateShaperProtocol):
gdf: gpd.GeoDataFrame

def __init__(self, gdf):
self.gdf = gdf

@staticmethod
def _extract_columns_by_pattern(
pattern_text: str, gdf: gpd.GeoDataFrame
) -> set[str]:
"""
Extract column names based on the provided regex pattern.
Args:
pattern_text (Pattern[str]): The compiled regex pattern to match the column names.
df (pd.DataFrame): The DataFrame from which to extract the RP values.
Returns:
Set[str]: A set of RP values extracted from the column names.
"""
pattern = re.compile(pattern_text)
columns = {pattern.search(c).group(1) for c in gdf.columns if pattern.search(c)}
return columns

def get_return_periods(self) -> list:
# Extract the RP values from the columns
rp_values = OsdToIntegrateShaper._extract_columns_by_pattern(
pattern_text=r"RP(\d+)", gdf=self.gdf
)
if not rp_values:
raise ValueError("No damage column with RP found")

return sorted([float(rp) for rp in rp_values])

def shape_to_integrate_object(
self, return_periods: list
) -> dict[str : gpd.GeoDataFrame]:
_to_integrate_dict = {}
# finds vulnerability curves shown with Ci, where 1<= i <=6
vulnerability_curves = sorted(
OsdToIntegrateShaper._extract_columns_by_pattern(
pattern_text=r"(C\d+)", gdf=self.gdf
)
)
for vulnerability_curve in vulnerability_curves:

filtered_columns = sorted(
OsdToIntegrateShaper._extract_columns_by_pattern(
pattern_text=rf"(.*{vulnerability_curve}.*representative.*)",
gdf=self.gdf,
)
)
_to_integrate = self.gdf[filtered_columns]

_to_integrate.columns = [
float(c.split("_")[2].replace("RP", "")) for c in _to_integrate.columns
]
_to_integrate_dict[f"{vulnerability_curve}"] = _to_integrate.sort_index(
axis="columns", ascending=False
)

return _to_integrate_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import geopandas as gpd
from typing import Any, Protocol, runtime_checkable

from ra2ce.analysis.analysis_config_data.enums.damage_curve_enum import DamageCurveEnum


@runtime_checkable
class ToIntegrateShaperProtocol(Protocol):
gdf: gpd.GeoDataFrame

def get_return_periods(self) -> list:
"""
Gets the columns' names that have damages calculated
Returns:
list: columns' name that have damages calculated.
"""

def shape_to_integrate_object(
self, return_periods: list
) -> dict[str : gpd.GeoDataFrame]:
"""
Gets the return periods and create columns for risk calculation for each damage curve and return period
Returns:
list[GeoDataFrame]:
to_integrate objects each corresponding to each vulnerability curve to be used for risk calculation
"""
Loading

0 comments on commit 66d1961

Please sign in to comment.