Skip to content

Commit

Permalink
recording of glofas Q
Browse files Browse the repository at this point in the history
  • Loading branch information
ERKuipers committed Nov 12, 2024
1 parent afdd131 commit 5799f03
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 75 deletions.
31 changes: 19 additions & 12 deletions GloFAS_analysis/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pandas as pd
from pathlib import Path
from rasterstats import zonal_stats
from vectorCheck import checkVectorFormat
from shapely import wkt
import rioxarray as rio

Expand Down Expand Up @@ -38,19 +39,25 @@ def zonalStats(rasterDA, vectorGDF, measure='max'):
communeMax_gdf = gpd.GeoDataFrame(communeMax_df, geometry=vectorGDF.geometry)
return communeMax_gdf

def query(rasterDA, pointGDF, nrEnsemble, timestamp):

def query(rasterDA, pointGDF):
'''
pointGDF= eodataframe with geometry describing the locations of the different points, nothing else
rasterDA = xarray.DataArray with
pointGDF: GeoDataFrame with geometry describing the locations of the different points, nothing else
rasterDA: xarray.DataArray with raster data
'''

point_latitude, point_longitude = pointGDF.get_coordinates().values
pointGDF ['rastervalue'] = [x for x in rasterDA.sel(
latitude=point_latitude,
longitude=point_longitude,
method='nearest')]
coordinates = pointGDF.get_coordinates().values # 2D array with latitudes and longitudes
point_latitude = coordinates[:, 0] # Extract all latitudes (first column)
point_longitude = coordinates[:, 1] # Extract all longitudes (second column)

# Query the raster data for each point and store the result in a new 'rastervalue' column
pointGDF['rastervalue'] = [
rasterDA.sel(latitude=lat, longitude=lon, method='nearest').values.item() # Get scalar value
for lat, lon in zip(point_latitude, point_longitude)
]

return pointGDF


def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, measure=None):
'''
Decision tree for data reduction by aggregation.
Expand All @@ -68,7 +75,7 @@ def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, meas
timestamp : str or datetime, optional
Timestamp to select in case of time-dependent raster data.
measure : str, optional
Statistical measure to calculate for zonal statistics (e.g., 'mean', 'sum').
Statistical measure to calculate for zonal statistics (e.g., 'mean', 'sum'). Only relevant for polygon or buffer aggregation
Returns:
GeoDataFrame
Expand All @@ -85,12 +92,12 @@ def aggregation (rasterDA, vector, method, nrEnsemble=None, timestamp=None, meas
# this argument is ignored when considering polygons, because on that level precision doesnt really matter
vectorGDF = checkVectorFormat(vector, shapeType=method, crs=rasterDA.rio.crs, placement='model')
if method == 'point':
pointgdf = query(rasterDA_single, vectorGDF)
pointgdf = query(rasterDA, vectorGDF)
return pointgdf
# elif method == 'bufferpoint':
# return pointgdf
elif method == 'polygon':
admgdf = zonal_stats(rasterDA_single, vectorGDF, measure)
admgdf = zonal_stats(rasterDA, vectorGDF, measure)
return admgdf
else:
raise ValueError ("Invalid method, choose 'point', or 'adm_unit'")
Expand Down
143 changes: 86 additions & 57 deletions GloFAS_analysis/createRecordingCSV.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,89 @@
from aggregation import aggregation
import os
import numpy as np
import pandas as pd
import configuration as cfg
from aggregation import aggregation
from open_extract import unzipGloFAS, openGloFAS
from vectorCheck import checkVectorFormat
from vectorCheck import checkVectorFormat # this is correctly imported
from forecast_dataFetch import compute_dates_range
import numpy as np
import pandas as pd
import batch_configuration as cfg

# Define constants
START_DATE = '2024-06-26'
END_DATE = '2024-11-01'
leadtime = 168
IDhead = 'StationName'

# Check vector format and retrieve point IDs
pointvectorMODEL_gdf = checkVectorFormat(cfg.GloFASstations, shapeType='point', crs=cfg.crs, placement='model')
pointIDs = pointvectorMODEL_gdf[IDhead]

# Define probability percentiles
probabilities = np.arange(0, 101, 10) # From 0% to 100% in steps of 10

# Define column headers for output dataframes
indexheads = []
for pointID in pointIDs:
for perc in probabilities:
indexheads.append(f'{pointID}_{perc}') # Combining station ID and probability for unique column names

# Generate date range
dates = compute_dates_range(START_DATE, END_DATE)

# Initialize DataFrames
# Use np.nan to fill initial values, and set up DataFrames with proper indexing and column names
aggregated_df = pd.DataFrame(np.nan, index=dates, columns=pointIDs)
ensemblemembers_aggregated_df = pd.DataFrame(np.nan, index=dates, columns=indexheads)

# Process data for each date
for date in dates:
# Extract year, month, and day for file path formatting
year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')

# Unzip and open raster data for the current date
rasterPath = unzipGloFAS(cfg.DataDir, leadtime, month, day, year)
Q_da = openGloFAS(rasterPath, cfg.lakesPath)

def aggregate_forecasted(
START_DATE,
END_DATE,
pointvector,
leadtime=168, # 7 days, default
DataDir=os.getcwd(),
IDhead='StationName',
probability=False,
probabilityInterval=10, # in percentile
output_filename='aggregated.csv',
lakesPath=None,
crs='EPSG:4326'
):

"""
Aggregate forecasted data from GloFAS for the given date range and station IDs.
# Determine ensemble members based on the number of probabilities
totalEnsembleMembers = len(Q_da.number)
step = int(totalEnsembleMembers / 10) # Ensure step is integer
relevantMembers = np.arange(1, totalEnsembleMembers, step)

# Aggregate data for each ensemble member at each point
for nrEnsemble in relevantMembers:
# Perform the aggregation and assign to the respective date and column in `ensemblemembers_aggregated_df`
agg_results = aggregation(Q_da, pointvectorMODEL_gdf, 'point', nrEnsemble=int(nrEnsemble), timestamp=date)
for idx, (pointID, perc) in enumerate(zip(pointIDs, probabilities)):
ensemblemembers_aggregated_df.loc[date, f'{pointID}_{perc}'] = agg_results.get((pointID, int(perc)), np.nan)

# Write the final aggregated DataFrame to CSV
ensemblemembers_aggregated_df.to_csv(f'{cfg.DataDir}/aggregated.csv')
Arguments:
- START_DATE (str): The start date for aggregation.
- END_DATE (str): The end date for aggregation.
- DataDir (str): Directory to save the output CSV.
- leadtime (int): Leadtime for the forecast.
- IDhead (str): Column name for the station ID.
- probability (bool): Whether to aggregate based on probabilities.
- probabilityInterval (int): The interval for probabilities (default 10).
- output_filename (str): Name of the output file (default 'aggregated.csv').
Returns:
- None: Writes the aggregated data to a CSV file.
"""

# Check vector format and retrieve point IDs
pointvectorMODEL_gdf = checkVectorFormat(pointvector, shapeType='point', crs=crs, placement='real')
pointIDs = pointvectorMODEL_gdf[IDhead]
# Generate date range
dates = compute_dates_range(START_DATE, END_DATE)

# Initialize DataFrame for aggregation
if probability:
probabilities = np.arange(0, 101, probabilityInterval) # Probability percentiles
indexheads = [f'{pointID}_{perc}' for pointID in pointIDs for perc in probabilities]
aggregated_df = pd.DataFrame(np.nan, index=dates, columns=indexheads)
else:
aggregated_df = pd.DataFrame(np.nan, index=dates, columns=pointIDs)

# Process data for each date
for date in dates:
# Extract year, month, and day for file path formatting
year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')

# Unzip and open raster data for the current date
rasterPath = unzipGloFAS(DataDir, leadtime, month, day, year)
Q_da = openGloFAS(rasterPath, lakesPath, crs)

# Aggregate data for probability case
if probability:
totalEnsembleMembers = len(Q_da.number)
step = int(totalEnsembleMembers / probabilityInterval) # Ensure integer step
relevantMembers = np.arange(1, totalEnsembleMembers, step)

# Aggregate data for each ensemble member at each point
for nrEnsemble in relevantMembers:
agg_results = aggregation(Q_da, pointvectorMODEL_gdf, 'point', nrEnsemble=int(nrEnsemble), timestamp=date)
for perc in probabilities:
for pointID in pointIDs:
aggregated_df.loc[date, f'{pointID}_{perc}'] = agg_results.get((pointID, int(perc)), np.nan)

# Aggregate data for non-probability case
else:
aggregation_Q_gdf= aggregation(Q_da, pointvectorMODEL_gdf, 'point', timestamp=date)
aggregated_df.loc[date, :] = aggregation_Q_gdf ['rastervalue'].values

# Write the final aggregated DataFrame to CSV
aggregated_df.to_csv(f'{DataDir}/{output_filename}')
print(f'Aggregation complete! Data saved to {DataDir}/{output_filename}')

if __name__=='__main__':
aggregate_forecasted('2024-06-26', '2024-11-01', pointvector=cfg.GloFASstations, DataDir=cfg.DataDir)
2 changes: 1 addition & 1 deletion GloFAS_data_extractor/forecast_dataFetch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import cdsapi
import datetime
import warnings
import batch_configuration as cfg
import configuration as cfg
import os
from pathlib import Path

Expand Down
10 changes: 10 additions & 0 deletions GloFAS_prep/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import sys
import os
from pathlib import Path

cur = Path(os.getcwd())
parent_dir = cur.parent
working_dir = 'c:\\Users\\els-2\\MaliGloFAS\\river_flood_data_analysis\\GloFAS_prep'
os.chdir(working_dir)
sys.path.append(working_dir)
print (sys.path)
32 changes: 32 additions & 0 deletions GloFAS_prep/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pathlib import Path
import numpy as np
import math
import os

os.chdir (f'C:\\Users\\els-2\\')
cur = Path.cwd()
DataDir = cur / 'MaliGloFAS\\data'
# Mali area coordinates as in notation as used by GloFAS (which is very weird)
# north, west, south, east
MaliArea = [25, -12.25, 10, 4.25] # Mali area [lat_max, lon_min, lat_min, lon_max] (upper left corner , down right corner)
regionPath = DataDir / f"Visualization/ADM1_Affected_SHP.shp" #région corresponds to ADM1, larger districts
communePath = DataDir / f"Visualization/ADM3_Affected_SHP.shp"
cerclePath = DataDir / f"Visualization/mli_admbnda_adm2_1m_gov_20211220.shp"
adminPaths = [regionPath, cerclePath, communePath]
lakesPath = DataDir / f'Visualization/waterbodies/waterbodies_merged.shp'
stationsDir = DataDir / f'stations'

googlestations = stationsDir / 'coords_google_gauges_Mali.csv'
GloFASstations = stationsDir / 'GloFAS_MaliStations_v4.csv'
impact_csvPath = DataDir / "impact/MergedImpactData.csv"
crs = f'EPSG:4326'
RPsyr = [1.5, 2.0, 5.0, 10.0] # return period threshold in years
leadtimes = 168 # hours
startYear = 2004
endYear = 2023 # 00:00 1st of january of that year, so up to but not including
triggerProb = 0.6
actionLifetime = 10 #days
adminLevel = 2 # choose level on which you would like to aggregate : 1,2,3
years = np.arange(startYear, endYear, 1)
admPath = adminPaths [(adminLevel-1)] # generate the useful administrative unit path
nrCores = 6 #determine number of cpu cores to use
16 changes: 11 additions & 5 deletions GloFAS_prep/open_extract.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import zipfile
import rioxarray as rio
import fiona
import xarray as xr

def unzipGloFAS (DataDir, leadtime, month, day, year=None):
'''month : month as number as string, so January = '01' (Type:Str)
day : day as number as string, so first day of the month = '01' (Type:Str)
Expand All @@ -10,15 +15,15 @@ def unzipGloFAS (DataDir, leadtime, month, day, year=None):

elif isinstance (year, (str,int)):
year = str(year)
zipRasterPath = f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}.zip'
zipRasterPath = f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}.zip'
with zipfile.ZipFile(zipRasterPath, 'r') as zip_ref:
zip_ref.extractall(f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/')
rasterPath = f'{DataDir}/GloFASforecast/{int(self.leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/'
zip_ref.extractall(f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/')
rasterPath = f'{DataDir}/GloFASforecast/{int(leadtime)}hours/cems-glofas-forecast_{year}_{month}_{day}/data.grib'
else:
ValueError (f"Argument year should be of type str or int")
return rasterPath

def openGloFAS(rasterPath, lakesPath=None):
def openGloFAS(rasterPath, lakesPath=None, crs='EPSG:4326'):
'''Returns a DataArray with masked lakes for a single netcdf / grib file'''

if rasterPath.endswith('grib'):
Expand All @@ -28,11 +33,12 @@ def openGloFAS(rasterPath, lakesPath=None):
else:
raise TypeError('Make sure predictive GloFAS raster is either of format netCDF4 or GRIB2')


# Correct longitude transformation if needed (see GloFAS issues)
Q_ds["longitude"] = Q_ds["longitude"].where(Q_ds["longitude"] < 180, Q_ds["longitude"] - 360)

Q_da = Q_ds["dis24"]
Q_da.rio.write_crs(self.crs, inplace=True)
Q_da.rio.write_crs(crs, inplace=True)

if str(lakesPath).endswith('shp'):
with fiona.open(lakesPath, 'r') as shapefile:
Expand Down
8 changes: 8 additions & 0 deletions GloFAS_prep/vectorCheck.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import xarray as xr
import geopandas as gpd
import pandas as pd
from pathlib import Path
from rasterstats import zonal_stats
from shapely import wkt
import rioxarray as rio

def checkVectorFormat(vector, shapeType=None, crs='EPSG:4326', placement='real'):
'''
Transforms various forms of vector inputs into a proper GeoDataFrame.
Expand Down
10 changes: 10 additions & 0 deletions __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

import os
from pathlib import Path
import sys
cur = Path(os.getcwd())
parent_dir = cur.parent
working_dir = parent_dir #/ 'MaliGloFAS/river_flood_data_analysis'
os.chdir(working_dir)
sys.path.append(working_dir)
print (os.getcwd())

0 comments on commit 5799f03

Please sign in to comment.