From b78e5521b23b05221a4fcac6b8ff198c25410366 Mon Sep 17 00:00:00 2001 From: ERKuipers Date: Mon, 16 Dec 2024 18:26:50 +0100 Subject: [PATCH] update for observational data --- comparison/observation/HydroImpact.py | 86 ++++++++++++++++++++------- comparison/observation/thresholds.py | 12 ++-- 2 files changed, 73 insertions(+), 25 deletions(-) diff --git a/comparison/observation/HydroImpact.py b/comparison/observation/HydroImpact.py index 3758b34..ef8a14e 100644 --- a/comparison/observation/HydroImpact.py +++ b/comparison/observation/HydroImpact.py @@ -16,6 +16,7 @@ def parse_date_with_fallback(date_str, year): date = pd.to_datetime(f"{year} {date_str}", format="%Y %d/%m %H:%M") return date except ValueError: + # If the date is invalid (e.g., Feb 29 in non-leap years), return None #print(f"Invalid date skipped: {date_str} for year {year}") return None @@ -41,7 +42,7 @@ def create_datetime(row): # Display the reshaped DataFrame return hydro_df_long -def stampHydroTrigger(hydro_df, StationName, type_of_extremity, probability, value_col='Value'): +def stampHydroTrigger(hydro_df, StationName, type_of_extremity, probability, value_col='Value', date_col='Date'): """ Adds a 'trigger' column to the hydro_df DataFrame indicating whether the 'Value' exceeds the QRP threshold. @@ -55,16 +56,30 @@ def stampHydroTrigger(hydro_df, StationName, type_of_extremity, probability, val Returns: - pd.DataFrame: Copy of the input DataFrame with an additional 'trigger' column. """ + # Assuming hydro_df is already loaded + if date_col in hydro_df.columns: + # Convert the date_col to datetime + hydro_df[date_col] = pd.to_datetime(hydro_df[date_col], format="%Y-%m-%d %H:%M", errors='coerce') + + # Check for any invalid dates + if hydro_df[date_col].isnull().any(): + print("Warning: Some dates in 'date_col' could not be parsed and are set to NaT.") + + # Set the index to the datetime column + hydro_df.set_index(date_col, inplace=True) + #print(f"Index successfully set to datetime using '{date_col}':\n{hydro_df.index}") + else: + print(f"Error: Column '{date_col}' does not exist in the DataFrame, hopefully teh date is the index") + # Ensure "Value" column exists in the DataFrame if value_col not in hydro_df.columns: raise ValueError("The input DataFrame must contain a 'Value' column, please specify the name correctly") #QRP = QRP_station(HydroStations_RP_file, StationName, RP) - Q_station = hydro_df[value_col] - + Q_station = hydro_df[value_col] # this is where it goes wrong!!! if type_of_extremity == 'RP': - Q_prob= Q_Gumbel_fit_RP (hydro_df, probability) + Q_prob= Q_Gumbel_fit_RP (hydro_df, probability, value_col=value_col) # results in infinite elif type_of_extremity == 'percentile': # assuming above 20 is percentile, RP is percentile instead - Q_prob = Q_GEV_fit (hydro_df, probability) + Q_prob = Q_Gumbel_fit_percentile (hydro_df, percentile, value_col) else: print ('no such type of extremity') #print (f"for {StationName} : return period Q= {QRP}") @@ -81,7 +96,15 @@ def stampHydroTrigger(hydro_df, StationName, type_of_extremity, probability, val def createEvent(trigger_df, date_col='Date'): # Sort the dataframe by 'ValidTime', then by administrative unit: so that for each administrative unit, they are # first sort on valid time - trigger_df = trigger_df.sort_values(by=[f'{date_col}']).reset_index(drop=True) + if date_col in trigger_df.columns: + trigger_df = trigger_df.sort_values(by=[f'{date_col}']).reset_index(drop=True) + else: + try: + trigger_df = trigger_df.sort_index() + except: + raise ValueError (f'{date_col} not in trigger_df index and no set index either') + + # Prepare commune info with geometry for event creation event_data = [] @@ -92,18 +115,29 @@ def createEvent(trigger_df, date_col='Date'): r = 0 while r < len(trigger_df): row = trigger_df.iloc[r] + if processed_rows[r]: # If the row is already processed, continue to the next row r += 1 continue - # Checking if the current row and next row are both part of an event (Trigger == 1), and the trigger happens in the same adiministrative unit + # Checking if the current row and next row are both part of an event (Trigger == 1) + # and the trigger happens in the same administrative unit elif row['Trigger'] == 1 and r + 1 < len(trigger_df) and trigger_df.iloc[r + 1]['Trigger'] == 1: Event = 1 - StartDate = row[f'{date_col}'] + StartDate = row[f'{date_col}'] if date_col in trigger_df.columns else trigger_df.index[r] + # Continue until the end of the event where 'Trigger' is no longer 1 while r < len(trigger_df) and trigger_df.iloc[r]['Trigger'] == 1: - possible_endtime = trigger_df.iloc[r][f'{date_col}'] + # Check if the date column is part of the index or a column + if date_col in trigger_df.columns: + possible_endtime = trigger_df.iloc[r][f'{date_col}'] + else: # assuming it is the index + try: + possible_endtime = trigger_df.index[r] + except: + raise ValueError(f'{date_col} not in columns and not a valid index either') + processed_rows[r] = True # Mark row as processed row = trigger_df.iloc[r] r += 1 @@ -138,7 +172,7 @@ def createEvent(trigger_df, date_col='Date'): events_df = pd.DataFrame(columns=['Observation', 'Start Date', 'End Date']) return events_df -def loop_over_stations(station_csv, DataDir, type_of_extremity, probability, value_col='Value'): +def loop_over_stations_obs(station_csv, DataDir, type_of_extremity, probability, value_col='Value'): probability = float(probability) station_df = pd.read_csv (station_csv, header=0) #print (station_df.columns) @@ -149,8 +183,8 @@ def loop_over_stations(station_csv, DataDir, type_of_extremity, probability, val BasinName= stationrow['Basin'] stationPath = rf"{hydrodir}/{BasinName}_{StationName}.csv" try: - hydro_df = transform_hydro (stationPath) print (f'calculating {StationName}, in {BasinName}') + hydro_df = transform_hydro (stationPath) # this actually looks good except: # print (f'no discharge measures found for station {StationName} in {BasinName}') continue @@ -162,7 +196,10 @@ def loop_over_stations(station_csv, DataDir, type_of_extremity, probability, val #generate the gdf to merge with where the points are attributed to the respective administrative units all_events_df = pd.concat (all_events, ignore_index=True) - all_events_df.to_csv (f"{DataDir}/Observation/observationalStation_flood_events_RP_{RP}yr.csv") + if type_of_extremity =='RP': + all_events_df.to_csv (f"{DataDir}/Observation/observationalStation_flood_events_RP_{probability}yr.csv") + elif type_of_extremity =='percentile': + all_events_df.to_csv (f"{DataDir}/Observation/observationalStation_flood_events_percentile{probability}.csv") return all_events_df def loop_over_stations_pred(station_csv, stationsDir, probability, type_of_extremity, value_col, leadtime): @@ -170,14 +207,18 @@ def loop_over_stations_pred(station_csv, stationsDir, probability, type_of_extre station_df = pd.read_csv (station_csv, header=0) all_events = [] for _, stationrow in station_df.iterrows(): - StationName = stationrow['StationName'] + StationName = remove_accents(stationrow['StationName']) BasinName= stationrow['Basin'] - "C:\Users\els-2\MaliGloFAS\data\stations\GloFAS_Q\timeseries\discharge_timeseries_Guelelinkoro_72.csv" - glofas_csv = rf"{stationsDir}/GloFAS_Q/timeseries/discharge_timeseries_{StationName}_{leadtime}.csv" - glofas_df = pd.read_csv(glofas_csv, header=0) - + try: + glofas_df = pd.read_csv(glofas_csv, + header=0, + index_col=0, # setting ValidTime as index column + parse_dates=True) + except: + print (f'no discharge series for station {StationName}') + continue trigger_df = stampHydroTrigger (glofas_df, StationName, type_of_extremity, probability, value_col) event_df = createEvent (trigger_df) @@ -186,7 +227,10 @@ def loop_over_stations_pred(station_csv, stationsDir, probability, type_of_extre #generate the gdf to merge with where the points are attributed to the respective administrative units all_events_df = pd.concat (all_events, ignore_index=True) - all_events_df.to_csv (f"{DataDir}/Observation/observationalStation_flood_events_RP_{RP}yr.csv") + if type_of_extremity =='RP': + all_events_df.to_csv (f"{stationsDir}/GloFAS_Q/GloFASstation_flood_events_RP{probability}yr_leadtime{leadtime}.csv") + elif type_of_extremity =='percentile': + all_events_df.to_csv (f"{stationsDir}/GloFAS_Q/GloFASstation_flood_events_percentile{probability}_leadtime{leadtime}.csv") return all_events_df def events_per_adm (DataDir, admPath, adminLevel, station_csv, StationDataDir, all_events_df, model, RP): @@ -220,9 +264,11 @@ def events_per_adm (DataDir, admPath, adminLevel, station_csv, StationDataDir, a all_events = [] for leadtime in cfg.leadtimes: for RP in cfg.RPsyr: - loop_over_stations_pred(cfg.DNHstations, cfg.stationsDir, RP, 'RP', value_col, leadtime) + loop_over_stations_obs (cfg.DNHstations, cfg.DataDir, 'RP', RP, value_col='Value') + #loop_over_stations_pred(cfg.DNHstations, cfg.stationsDir, RP, 'RP', value_col, leadtime) for percentile in cfg.percentiles: - loop_over_stations_pred(cfg.DNHstations, cfg.stationsDir, percentile, 'percentile', value_col, leadtime) + loop_over_stations_obs (cfg.DNHstations, cfg.DataDir, 'percentile', percentile, value_col='Value') + #loop_over_stations_pred(cfg.DNHstations, cfg.stationsDir, percentile, 'percentile', value_col, leadtime) # # running this script for the GloFAS # #print (readcsv(f"{DataDir}/Données partagées - DNH Mali - 2019/Donne╠ües partage╠ües - DNH Mali - 2019/De╠übit du Niger a╠Ç Ansongo.csv")) diff --git a/comparison/observation/thresholds.py b/comparison/observation/thresholds.py index abab528..6d6762c 100644 --- a/comparison/observation/thresholds.py +++ b/comparison/observation/thresholds.py @@ -11,8 +11,8 @@ def calculate_max_discharge(hydro_df: pd.DataFrame, value_col: str = 'Value', timescale: str = 'Year', - incomplete_lastyear: (str,int) = 2023 # not considering the maximum of incomplete years, as these are not representative of the season - ) -> pd.Series: + incomplete_lastyear: (str,int) = 2023, # not considering the maximum of incomplete years, as these are not representative of the season + date_col: str ='Date') -> pd.Series: """ Calculates the maximum discharge values for a given timescale. @@ -25,6 +25,7 @@ def calculate_max_discharge(hydro_df: pd.DataFrame, pd.Series: Maximum discharge values with the corresponding timescale as the index. """ if not isinstance(hydro_df.index, pd.DatetimeIndex): + hydro_df [date_col] = hydro_df.index hydro_df.index = pd.to_datetime(hydro_df.index) hydro_df = hydro_df.copy() @@ -38,13 +39,14 @@ def calculate_max_discharge(hydro_df: pd.DataFrame, ) if timescale == 'Year': hydro_df['Year'] = hydro_df.index.year - return hydro_df.groupby('Year')[value_col].max() + + maximum = hydro_df.groupby('Year')[value_col].max() elif timescale == 'Day': hydro_df['Date'] = hydro_df.index.date - return hydro_df.groupby('Date')[value_col].max() + maximum = hydro_df.groupby('Date')[value_col].max() else: raise ValueError("Invalid timescale. Use 'Year' or 'Day'.") - + return maximum def csv_to_cleaned_series(csv: str) -> pd.Series: """ Reads a CSV file and prepares a cleaned pandas Series for analysis.