From 5799f0396b6e50d08b5f50c3bc994970a4480bf1 Mon Sep 17 00:00:00 2001 From: ERKuipers Date: Tue, 12 Nov 2024 15:29:55 +0100 Subject: [PATCH] recording of glofas Q --- GloFAS_analysis/aggregation.py | 31 +++-- GloFAS_analysis/createRecordingCSV.py | 143 ++++++++++++-------- GloFAS_data_extractor/forecast_dataFetch.py | 2 +- GloFAS_prep/__init__.py | 10 ++ GloFAS_prep/configuration.py | 32 +++++ GloFAS_prep/open_extract.py | 16 ++- GloFAS_prep/vectorCheck.py | 8 ++ __init__.py | 10 ++ 8 files changed, 177 insertions(+), 75 deletions(-) create mode 100644 GloFAS_prep/configuration.py create mode 100644 __init__.py diff --git a/GloFAS_analysis/aggregation.py b/GloFAS_analysis/aggregation.py index a737d79..d055039 100644 --- a/GloFAS_analysis/aggregation.py +++ b/GloFAS_analysis/aggregation.py @@ -4,6 +4,7 @@ import pandas as pd from pathlib import Path from rasterstats import zonal_stats +from vectorCheck import checkVectorFormat from shapely import wkt import rioxarray as rio @@ -38,19 +39,25 @@ def zonalStats(rasterDA, vectorGDF, measure='max'): communeMax_gdf = gpd.GeoDataFrame(communeMax_df, geometry=vectorGDF.geometry) return communeMax_gdf -def query(rasterDA, pointGDF, nrEnsemble, timestamp): + +def query(rasterDA, pointGDF): ''' - pointGDF= eodataframe with geometry describing the locations of the different points, nothing else - rasterDA = xarray.DataArray with + pointGDF: GeoDataFrame with geometry describing the locations of the different points, nothing else + rasterDA: xarray.DataArray with raster data ''' - - point_latitude, point_longitude = pointGDF.get_coordinates().values - pointGDF ['rastervalue'] = [x for x in rasterDA.sel( - latitude=point_latitude, - longitude=point_longitude, - method='nearest')] + coordinates = pointGDF.get_coordinates().values # 2D array with latitudes and longitudes + point_latitude = coordinates[:, 0] # Extract all latitudes (first column) + point_longitude = coordinates[:, 1] # Extract all longitudes (second column) + + # Query the raster data for each point and store the result in a new 'rastervalue' column + pointGDF['rastervalue'] = [ + rasterDA.sel(latitude=lat, longitude=lon, method='nearest').values.item() # Get scalar value + for lat, lon in zip(point_latitude, point_longitude) + ] + return pointGDF + def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, measure=None): ''' Decision tree for data reduction by aggregation. @@ -68,7 +75,7 @@ def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, meas timestamp : str or datetime, optional Timestamp to select in case of time-dependent raster data. measure : str, optional - Statistical measure to calculate for zonal statistics (e.g., 'mean', 'sum'). + Statistical measure to calculate for zonal statistics (e.g., 'mean', 'sum'). Only relevant for polygon or buffer aggregation Returns: GeoDataFrame @@ -85,12 +92,12 @@ def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, meas # this argument is ignored when considering polygons, because on that level precision doesnt really matter vectorGDF = checkVectorFormat(vector, shapeType=method, crs=rasterDA.rio.crs, placement='model') if method == 'point': - pointgdf = query(rasterDA_single, vectorGDF) + pointgdf = query(rasterDA, vectorGDF) return pointgdf # elif method == 'bufferpoint': # return pointgdf elif method == 'polygon': - admgdf = zonal_stats(rasterDA_single, vectorGDF, measure) + admgdf = zonal_stats(rasterDA, vectorGDF, measure) return admgdf else: raise ValueError ("Invalid method, choose 'point', or 'adm_unit'") diff --git a/GloFAS_analysis/createRecordingCSV.py b/GloFAS_analysis/createRecordingCSV.py index b65a0e0..a8b377a 100644 --- a/GloFAS_analysis/createRecordingCSV.py +++ b/GloFAS_analysis/createRecordingCSV.py @@ -1,60 +1,89 @@ -from aggregation import aggregation +import os +import numpy as np +import pandas as pd +import configuration as cfg +from aggregation import aggregation from open_extract import unzipGloFAS, openGloFAS -from vectorCheck import checkVectorFormat +from vectorCheck import checkVectorFormat # this is correctly imported from forecast_dataFetch import compute_dates_range -import numpy as np -import pandas as pd -import batch_configuration as cfg - -# Define constants -START_DATE = '2024-06-26' -END_DATE = '2024-11-01' -leadtime = 168 -IDhead = 'StationName' - -# Check vector format and retrieve point IDs -pointvectorMODEL_gdf = checkVectorFormat(cfg.GloFASstations, shapeType='point', crs=cfg.crs, placement='model') -pointIDs = pointvectorMODEL_gdf[IDhead] - -# Define probability percentiles -probabilities = np.arange(0, 101, 10) # From 0% to 100% in steps of 10 - -# Define column headers for output dataframes -indexheads = [] -for pointID in pointIDs: - for perc in probabilities: - indexheads.append(f'{pointID}_{perc}') # Combining station ID and probability for unique column names - -# Generate date range -dates = compute_dates_range(START_DATE, END_DATE) - -# Initialize DataFrames -# Use np.nan to fill initial values, and set up DataFrames with proper indexing and column names -aggregated_df = pd.DataFrame(np.nan, index=dates, columns=pointIDs) -ensemblemembers_aggregated_df = pd.DataFrame(np.nan, index=dates, columns=indexheads) - -# Process data for each date -for date in dates: - # Extract year, month, and day for file path formatting - year = date.strftime('%Y') - month = date.strftime('%m') - day = date.strftime('%d') - - # Unzip and open raster data for the current date - rasterPath = unzipGloFAS(cfg.DataDir, leadtime, month, day, year) - Q_da = openGloFAS(rasterPath, cfg.lakesPath) + +def aggregate_forecasted( + START_DATE, + END_DATE, + pointvector, + leadtime=168, # 7 days, default + DataDir=os.getcwd(), + IDhead='StationName', + probability=False, + probabilityInterval=10, # in percentile + output_filename='aggregated.csv', + lakesPath=None, + crs='EPSG:4326' + ): + + """ + Aggregate forecasted data from GloFAS for the given date range and station IDs. - # Determine ensemble members based on the number of probabilities - totalEnsembleMembers = len(Q_da.number) - step = int(totalEnsembleMembers / 10) # Ensure step is integer - relevantMembers = np.arange(1, totalEnsembleMembers, step) - - # Aggregate data for each ensemble member at each point - for nrEnsemble in relevantMembers: - # Perform the aggregation and assign to the respective date and column in `ensemblemembers_aggregated_df` - agg_results = aggregation(Q_da, pointvectorMODEL_gdf, 'point', nrEnsemble=int(nrEnsemble), timestamp=date) - for idx, (pointID, perc) in enumerate(zip(pointIDs, probabilities)): - ensemblemembers_aggregated_df.loc[date, f'{pointID}_{perc}'] = agg_results.get((pointID, int(perc)), np.nan) - -# Write the final aggregated DataFrame to CSV -ensemblemembers_aggregated_df.to_csv(f'{cfg.DataDir}/aggregated.csv') + Arguments: + - START_DATE (str): The start date for aggregation. + - END_DATE (str): The end date for aggregation. + - DataDir (str): Directory to save the output CSV. + - leadtime (int): Leadtime for the forecast. + - IDhead (str): Column name for the station ID. + - probability (bool): Whether to aggregate based on probabilities. + - probabilityInterval (int): The interval for probabilities (default 10). + - output_filename (str): Name of the output file (default 'aggregated.csv'). + + Returns: + - None: Writes the aggregated data to a CSV file. + """ + + # Check vector format and retrieve point IDs + pointvectorMODEL_gdf = checkVectorFormat(pointvector, shapeType='point', crs=crs, placement='real') + pointIDs = pointvectorMODEL_gdf[IDhead] + # Generate date range + dates = compute_dates_range(START_DATE, END_DATE) + + # Initialize DataFrame for aggregation + if probability: + probabilities = np.arange(0, 101, probabilityInterval) # Probability percentiles + indexheads = [f'{pointID}_{perc}' for pointID in pointIDs for perc in probabilities] + aggregated_df = pd.DataFrame(np.nan, index=dates, columns=indexheads) + else: + aggregated_df = pd.DataFrame(np.nan, index=dates, columns=pointIDs) + + # Process data for each date + for date in dates: + # Extract year, month, and day for file path formatting + year = date.strftime('%Y') + month = date.strftime('%m') + day = date.strftime('%d') + + # Unzip and open raster data for the current date + rasterPath = unzipGloFAS(DataDir, leadtime, month, day, year) + Q_da = openGloFAS(rasterPath, lakesPath, crs) + + # Aggregate data for probability case + if probability: + totalEnsembleMembers = len(Q_da.number) + step = int(totalEnsembleMembers / probabilityInterval) # Ensure integer step + relevantMembers = np.arange(1, totalEnsembleMembers, step) + + # Aggregate data for each ensemble member at each point + for nrEnsemble in relevantMembers: + agg_results = aggregation(Q_da, pointvectorMODEL_gdf, 'point', nrEnsemble=int(nrEnsemble), timestamp=date) + for perc in probabilities: + for pointID in pointIDs: + aggregated_df.loc[date, f'{pointID}_{perc}'] = agg_results.get((pointID, int(perc)), np.nan) + + # Aggregate data for non-probability case + else: + aggregation_Q_gdf= aggregation(Q_da, pointvectorMODEL_gdf, 'point', timestamp=date) + aggregated_df.loc[date, :] = aggregation_Q_gdf ['rastervalue'].values + + # Write the final aggregated DataFrame to CSV + aggregated_df.to_csv(f'{DataDir}/{output_filename}') + print(f'Aggregation complete! Data saved to {DataDir}/{output_filename}') + +if __name__=='__main__': + aggregate_forecasted('2024-06-26', '2024-11-01', pointvector=cfg.GloFASstations, DataDir=cfg.DataDir) \ No newline at end of file diff --git a/GloFAS_data_extractor/forecast_dataFetch.py b/GloFAS_data_extractor/forecast_dataFetch.py index 67a7a93..30c591a 100644 --- a/GloFAS_data_extractor/forecast_dataFetch.py +++ b/GloFAS_data_extractor/forecast_dataFetch.py @@ -1,7 +1,7 @@ import cdsapi import datetime import warnings -import batch_configuration as cfg +import configuration as cfg import os from pathlib import Path diff --git a/GloFAS_prep/__init__.py b/GloFAS_prep/__init__.py index e69de29..71397ba 100644 --- a/GloFAS_prep/__init__.py +++ b/GloFAS_prep/__init__.py @@ -0,0 +1,10 @@ +import sys +import os +from pathlib import Path + +cur = Path(os.getcwd()) +parent_dir = cur.parent +working_dir = 'c:\\Users\\els-2\\MaliGloFAS\\river_flood_data_analysis\\GloFAS_prep' +os.chdir(working_dir) +sys.path.append(working_dir) +print (sys.path) diff --git a/GloFAS_prep/configuration.py b/GloFAS_prep/configuration.py new file mode 100644 index 0000000..558593b --- /dev/null +++ b/GloFAS_prep/configuration.py @@ -0,0 +1,32 @@ +from pathlib import Path +import numpy as np +import math +import os + +os.chdir (f'C:\\Users\\els-2\\') +cur = Path.cwd() +DataDir = cur / 'MaliGloFAS\\data' +# Mali area coordinates as in notation as used by GloFAS (which is very weird) +# north, west, south, east +MaliArea = [25, -12.25, 10, 4.25] # Mali area [lat_max, lon_min, lat_min, lon_max] (upper left corner , down right corner) +regionPath = DataDir / f"Visualization/ADM1_Affected_SHP.shp" #région corresponds to ADM1, larger districts +communePath = DataDir / f"Visualization/ADM3_Affected_SHP.shp" +cerclePath = DataDir / f"Visualization/mli_admbnda_adm2_1m_gov_20211220.shp" +adminPaths = [regionPath, cerclePath, communePath] +lakesPath = DataDir / f'Visualization/waterbodies/waterbodies_merged.shp' +stationsDir = DataDir / f'stations' + +googlestations = stationsDir / 'coords_google_gauges_Mali.csv' +GloFASstations = stationsDir / 'GloFAS_MaliStations_v4.csv' +impact_csvPath = DataDir / "impact/MergedImpactData.csv" +crs = f'EPSG:4326' +RPsyr = [1.5, 2.0, 5.0, 10.0] # return period threshold in years +leadtimes = 168 # hours +startYear = 2004 +endYear = 2023 # 00:00 1st of january of that year, so up to but not including +triggerProb = 0.6 +actionLifetime = 10 #days +adminLevel = 2 # choose level on which you would like to aggregate : 1,2,3 +years = np.arange(startYear, endYear, 1) +admPath = adminPaths [(adminLevel-1)] # generate the useful administrative unit path +nrCores = 6 #determine number of cpu cores to use \ No newline at end of file diff --git a/GloFAS_prep/open_extract.py b/GloFAS_prep/open_extract.py index b29546f..554c570 100644 --- a/GloFAS_prep/open_extract.py +++ b/GloFAS_prep/open_extract.py @@ -1,3 +1,8 @@ +import zipfile +import rioxarray as rio +import fiona +import xarray as xr + def unzipGloFAS (DataDir, leadtime, month, day, year=None): '''month : month as number as string, so January = '01' (Type:Str) day : day as number as string, so first day of the month = '01' (Type:Str) @@ -10,15 +15,15 @@ def unzipGloFAS (DataDir, leadtime, month, day, year=None): elif isinstance (year, (str,int)): year = str(year) - zipRasterPath = f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}.zip' + zipRasterPath = f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}.zip' with zipfile.ZipFile(zipRasterPath, 'r') as zip_ref: - zip_ref.extractall(f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/') - rasterPath = f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/' + zip_ref.extractall(f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/') + rasterPath = f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/data.grib' else: ValueError (f"Argument year should be of type str or int") return rasterPath -def openGloFAS(rasterPath, lakesPath=None): +def openGloFAS(rasterPath, lakesPath=None, crs='EPSG:4326'): '''Returns a DataArray with masked lakes for a single netcdf / grib file''' if rasterPath.endswith('grib'): @@ -28,11 +33,12 @@ def openGloFAS(rasterPath, lakesPath=None): else: raise TypeError('Make sure predictive GloFAS raster is either of format netCDF4 or GRIB2') + # Correct longitude transformation if needed (see GloFAS issues) Q_ds["longitude"] = Q_ds["longitude"].where(Q_ds["longitude"] < 180, Q_ds["longitude"] - 360) Q_da = Q_ds["dis24"] - Q_da.rio.write_crs(self.crs, inplace=True) + Q_da.rio.write_crs(crs, inplace=True) if str(lakesPath).endswith('shp'): with fiona.open(lakesPath, 'r') as shapefile: diff --git a/GloFAS_prep/vectorCheck.py b/GloFAS_prep/vectorCheck.py index 8bc932d..e1c7a44 100644 --- a/GloFAS_prep/vectorCheck.py +++ b/GloFAS_prep/vectorCheck.py @@ -1,3 +1,11 @@ +import xarray as xr +import geopandas as gpd +import pandas as pd +from pathlib import Path +from rasterstats import zonal_stats +from shapely import wkt +import rioxarray as rio + def checkVectorFormat(vector, shapeType=None, crs='EPSG:4326', placement='real'): ''' Transforms various forms of vector inputs into a proper GeoDataFrame. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..2227109 --- /dev/null +++ b/__init__.py @@ -0,0 +1,10 @@ + +import os +from pathlib import Path +import sys +cur = Path(os.getcwd()) +parent_dir = cur.parent +working_dir = parent_dir #/ 'MaliGloFAS/river_flood_data_analysis' +os.chdir(working_dir) +sys.path.append(working_dir) +print (os.getcwd()) \ No newline at end of file