diff --git a/GloFAS/GloFAS_analysis/concatenate.py b/GloFAS/GloFAS_analysis/concatenate.py deleted file mode 100644 index 4c41fef..0000000 --- a/GloFAS/GloFAS_analysis/concatenate.py +++ /dev/null @@ -1,19 +0,0 @@ -from GloFAS.GloFAS_data_extractor.reforecast_dataFetch import get_monthsdays -from GloFAS.GloFAS_analysis.probability_exceeding_calculator import probability -def concatenate_prob(): - '''Concatenate GeoDataFrames over multiple day,months along the time dimension - order may be odd, but timestamp provided is still right''' - all_years_data = [] - MONTHSDAYS = get_monthsdays() - for md in MONTHSDAYS: - month = md[0].lower() - day = md[1] - floodedCommune_gdf = probability() # Open the GloFAS data for the current month, day !!!!!!!!!!!!! need to change now ! - all_years_data.append(floodedCommune_gdf) - - - # Concatenate all GeoDataFrames for different years - floodedCommune_gdf_concat = gpd.GeoDataFrame(pd.concat(all_years_data, ignore_index=True), geometry='geometry') - floodedCommune_gdf_concat.to_file(f"{self.DataDir}/floodedRP{self.RPyr}yr_leadtime{self.leadtime}_ADM{self.adminLevel}.gpkg", driver="GPKG") - - return floodedCommune_gdf_concat \ No newline at end of file diff --git a/GloFAS/GloFAS_analysis/performance_calculator.py b/GloFAS/GloFAS_analysis/performance_calculator.py new file mode 100644 index 0000000..e69de29 diff --git a/GloFAS/GloFAS_analysis/probability_calculator.py b/GloFAS/GloFAS_analysis/probability_calculator.py new file mode 100644 index 0000000..d27a29d --- /dev/null +++ b/GloFAS/GloFAS_analysis/probability_calculator.py @@ -0,0 +1,96 @@ +import numpy as np +import pandas as pd +import geopandas as gpd +from datetime import datetime, timedelta +from joblib import Parallel, delayed +from GloFAS.GloFAS_prep.aggregation import aggregation +from GloFAS.GloFAS_data_extractor.reforecast_dataFetch import get_monthsdays +from GloFAS.GloFAS_data_extractor.forecast_dataFetch import compute_dates_range +from GloFAS.GloFAS_data_extractor.threshold_opener import openThreshold +from GloFAS.GloFAS_data_extractor.unzip_open import openGloFAS, unzipGloFAS +import GloFAS.GloFAS_prep.configuration as cfg + +class FloodProbabilityProcessor: + def __init__(self, leadtime, adminLevel, forecastType, measure='max', start_date=None, end_date=None): + # start and end_date are only necessary if your forecastType is not 'reforecast' but 'forecast' + self.leadtime = leadtime + self.adminLevel = adminLevel + self.measure = measure + self.DataDir = cfg.DataDir + self.crs = cfg.crs + self.RPyr = cfg.RPyr + self.area = cfg.MaliArea + self.lakesPath = cfg.lakesPath + self.forecastType = forecastType + if forecastType == 'reforecast': + self.MONTHSDAYS = get_monthsdays() + elif forecastType == 'forecast': + self.MONTHSDAYSYEARS = compute_dates_range(start_date, end_date) + # Initializations that rely on data + self.reference_rasterPath = unzipGloFAS(self.DataDir, self.leadtime, '01', '07') + self.reference_Q_da = openGloFAS(self.reference_rasterPath, self.lakesPath, self.crs) + self.threshold_da = openThreshold(self.DataDir, self.crs, self.RPyr, self.area, self.reference_Q_da) + self.threshold_gdf = aggregation(self.threshold_da, adminPath, 'polygon', measure=cfg.measure) + + def exceedance(self, Q_da, threshold_gdf, nrEnsemble): + '''Check exceedance of threshold values for a single ensemble member''' + GloFAS_gdf = aggregation(Q_da, threshold_gdf, 'polygon', nrEnsemble, measure=self.measure) + exceedance = GloFAS_gdf[f'{self.measure}'] > threshold_gdf[f'{self.measure}'] + return exceedance + + def probability(self, Q_da, threshold_gdf, nrCores): + '''Calculate exceedance probability across ensemble members for a single time step''' + nrMembers = len(Q_da.number) + exceedance_count = np.zeros(len(threshold_gdf), dtype=int) + + # Calculate exceedances for each ensemble in parallel + results = Parallel(n_jobs=nrCores)(delayed(self.exceedance)(Q_da, threshold_gdf, ensemble) for ensemble in Q_da.number) + for result in results: + exceedance_count += result + probability = exceedance_count / nrMembers + + # Prepare results as GeoDataFrame + flooded_df = threshold_gdf.copy() + flooded_df['ExceedanceCount'] = exceedance_count + flooded_df['Probability'] = probability + return flooded_df + + def concatenate_prob(self, Q_da, nrCores): + '''Concatenate exceedance probability calculations over multiple time steps''' + all_years_data = [] + if self.forecastType=='reforecast': + for month, day in self.MONTHSDAYS: + print(f"Starting for {month}, {day}") + Q_da_rasterpath = unzipGloFAS (self.DataDir, self.leadtime, month, day) + Q_da = openGloFAS(Q_da_rasterpath, self.leadtime) # Open data for the specific month/day + for timestamp in Q_da.time: # Iterate over years + startLoop = datetime.now() + valid_timestamp = pd.to_datetime(str(timestamp.values)) + timedelta(hours=self.leadtime) + # Calculate probability + flooded_df = self.probability(Q_da, self.threshold_gdf, nrCores) + flooded_df['ValidTime'] = valid_timestamp # Add timestamp + all_years_data.append(flooded_df) + print(f"Time step {valid_timestamp} done, took: {datetime.now() - startLoop}") + + if self.forecastType=='forecast': + for date in self.MONTHSDAYSYEARS: + startLoop = datetime.now() + year = date.strftime('%Y') + month = date.strftime('%m') + day = date.strftime('%d') + Q_da_rasterpath = unzipGloFAS(self.DataDir, self.leadtime, month, day, year) + Q_da = openGloFAS(Q_da_rasterpath, lakesPath=self.lakesPath, crs=self.crs) + valid_timestamp = pd.to_datetime(str(timestamp.values)) + timedelta(hours=self.leadtime) + # Calculate probability + flooded_df = self.probability(Q_da, self.threshold_gdf, nrCores) + flooded_df['ValidTime'] = valid_timestamp # Add timestamp + all_years_data.append(flooded_df) + print(f"Time step {valid_timestamp} done, took: {datetime.now() - startLoop}") + # Concatenate results for all time steps + floodProb_gdf_concat = pd.concat(all_years_data, ignore_index=True) + floodProb_gdf_concat = gpd.GeoDataFrame(floodProb_gdf_concat, geometry='geometry') + + # Save to file + output_path = f"{self.DataDir}/floodedRP{self.RPyr}yr_leadtime{self.leadtime}_ADM{self.adminLevel}.gpkg" + floodProb_gdf_concat.to_file(output_path, driver="GPKG") + return floodProb_gdf_concat diff --git a/GloFAS/GloFAS_analysis/probability_exceeding_calculator.py b/GloFAS/GloFAS_analysis/probability_exceeding_calculator.py deleted file mode 100644 index c616298..0000000 --- a/GloFAS/GloFAS_analysis/probability_exceeding_calculator.py +++ /dev/null @@ -1,48 +0,0 @@ -# -from GloFAS.GloFAS_prep.aggregation import aggregation - -def exceedance (threshold_gdf, nrEnsemble, measure='max'): - '''GloFAS_gdf based on zonal stats using one ensemble member and timestamp: so only two dimensions ''' - GloFAS_gdf = aggregation (Q_da, threshold_gdf, 'polygon', nrEnsemble, measure=measure) - exceedance = GloFAS_gdf[f'{measure}'] > self.threshold_gdf[f'{measure}'] - return exceedance - -print (f'starting for {month, day}') -self.Q_da = self.openGloFAS(month, day) -print (f'for month, day {month, day}, glofas raster opened') -for timestamp in self.Q_da.time: # which will be the same month,day, but alternating years - startLoop = datetime.now() - valid_timestamp = pd.to_datetime(str(timestamp.values)) + timedelta(hours=self.leadtime) - probability = probability - - -def probability(Q_da, threshold_gdf, nrCores, measure='max'): - '''calculates probability for one month/day/year, so put in for loop where you loop over each grib/netcdf file, and then again for the date that is in that netcdf file''' - - nrMembers = len(Q_da.number) # Number of ensemble members - commune_info_df = threshold_gdf[[f'ADM{self.adminLevel}', 'geometry']].copy() - floodedCommune_data = [] - exceedance_count = np.zeros(len(self.threshold_gdf), dtype=int) - - # Parallelize within the ensemble members - results = Parallel(n_jobs=nrCores)(delayed(exceedance)(threshold_gdf, nrEnsemble, measure=measure) for ensemble in Q_da.number) - - for result in results: - exceedance_count += result - probability = exceedance_count / nrMembers - - flooded_df = pd.DataFrame({ - f'ADM{self.adminLevel}': self.thresholdCommuneMax_gdf[f'ADM{self.adminLevel}'], - 'ExceedanceCount': exceedance_count, - 'Probability': probability, - 'ValidTime': valid_timestamp - }) - - flooded_df = flooded_df.join(commune_info_df.set_index(f'ADM{self.adminLevel}'), on=f'ADM{self.adminLevel}') - floodedCommune_data.append(flooded_df) - - print(f'timestep {valid_timestamp} done, took: {datetime.now() - startLoop}') - - floodedCommune_gdf = pd.concat(floodedCommune_data, ignore_index=True) - floodedCommune_gdf = gpd.GeoDataFrame(floodedCommune_gdf, geometry='geometry') - return floodedCommune_gdf diff --git a/GloFAS/GloFAS_prep/configuration.py b/GloFAS/GloFAS_prep/configuration.py index 558593b..49a15b4 100644 --- a/GloFAS/GloFAS_prep/configuration.py +++ b/GloFAS/GloFAS_prep/configuration.py @@ -29,4 +29,5 @@ adminLevel = 2 # choose level on which you would like to aggregate : 1,2,3 years = np.arange(startYear, endYear, 1) admPath = adminPaths [(adminLevel-1)] # generate the useful administrative unit path -nrCores = 6 #determine number of cpu cores to use \ No newline at end of file +nrCores = 6 #determine number of cpu cores to use +measure = 'max' # measure to aggregate on \ No newline at end of file diff --git a/GloFAS/GloFAS_prep/flood_definer.py b/GloFAS/GloFAS_prep/flood_definer.py new file mode 100644 index 0000000..fd0d07a --- /dev/null +++ b/GloFAS/GloFAS_prep/flood_definer.py @@ -0,0 +1,20 @@ +from GloFAS.GloFAS_prep.vectorCheck import checkVectorFormat +floodProbability_path = self.DataDir/ f"floodedRP{self.RPyr}yr_leadtime{self.leadtime}_ADM{self.adminLevel}.gpkg" + +def stampTrigger (floodProbability_gdf, actionLifetime, triggerProb): + '''defines trigger by the floodprobability that it needs to exceed, as well as writes the timeduration at which the trigger is valid + actionLifetime = actionLifetime in days''' + + # make everything capital + floodProbability_gdf[f'ADM{self.adminLevel}'] = floodProbability_gdf[f'ADM{self.adminLevel}'].apply(lambda x: unidecode.unidecode(x).upper()) + + + floodCommune_gdf['StartValidTime'] = pd.to_datetime(floodCommune_gdf['ValidTime'], errors='coerce') # - timedelta(days=self.buffervalid) + # incorporate actionlifetime + floodCommune_gdf['EndValidTime'] = pd.to_datetime(floodCommune_gdf['ValidTime'], errors='coerce') + timedelta(days=actionLifetime) + floodCommune_gdf['StartValidTime'] = pd.to_datetime(floodCommune_gdf['StartValidTime'], format='%d/%m/%Y', errors='coerce') + floodCommune_gdf['EndValidTime'] = pd.to_datetime(floodCommune_gdf['EndValidTime'], format='%d/%m/%Y', errors='coerce') + # Add Trigger column based on probability threshold + floodCommune_gdf['Trigger'] = np.where(floodCommune_gdf['Probability'] >= triggerProb, 1, 0) + + return floodCommune_gdf