Skip to content

Commit

Permalink
Merge pull request #105 from Ouranosinc/fix-12
Browse files Browse the repository at this point in the history
Parallel multi-basin simulations
  • Loading branch information
huard authored Apr 8, 2019
2 parents c6384c0 + 9ee8f3f commit 2e42a7d
Show file tree
Hide file tree
Showing 23 changed files with 417 additions and 349 deletions.
60 changes: 41 additions & 19 deletions raven/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class Raven:
'evspsbl': ['pet', 'evap', 'evapotranspiration'],
'water_volume_transport_in_river_channel': ['qobs', 'discharge', 'streamflow']
}
_parallel_parameters = ['params', 'name', 'area', 'elevation', 'latitude', 'longitude', 'region_id', 'hrus']
_parallel_parameters = ['params', 'nc_index', 'name', 'area', 'elevation', 'latitude', 'longitude', 'region_id',
'hrus']

def __init__(self, workdir=None):
"""Initialize the RAVEN model.
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(self, workdir=None):
self.exec_path = self.workdir / 'exec'
self.final_path = self.workdir / self.final_dir
self._psim = 0
self._pdim = None # Parallel dimension (either params or nbasins)

@property
def output_path(self):
Expand Down Expand Up @@ -282,9 +284,11 @@ def setup_model_run(self, ts):
Run index.
"""
# Match the input files
files, var_names = self._assign_files(ts, self.rvt.keys())
files, var_names, dimensions = self._assign_files(ts)
self.rvt.update(files, force=True)
self.rvd.update(var_names, force=True)
self.rvt.update(var_names, force=True)
if dimensions:
self.rvt.update({'nc_dimensions': dimensions}, force=True)

# Compute derived parameters
self.derived_parameters()
Expand Down Expand Up @@ -336,7 +340,7 @@ def run(self, ts, overwrite=False, **kwds):
if isinstance(ts, (six.string_types, Path)):
ts = [ts, ]

# Special case for parallel parameters
# Case for potentially parallel parameters
pdict = {}
for p in self._parallel_parameters:
a = kwds.pop(p, None)
Expand All @@ -346,8 +350,11 @@ def run(self, ts, overwrite=False, **kwds):
else:
pdict[p] = np.atleast_1d(a)

# Number of parallel loops is dictated by the number of parameters
nloops = len(pdict['params'])
# Number of parallel loops is dictated by the number of parameters or nc_index.
nloops = max(len(pdict['params']), len(pdict['nc_index']))
if nloops > 1:
self._pdim = 'nbasins' if len(pdict['nc_index']) > 1 else 'params'

for key, val in pdict.items():
if len(val) not in [1, nloops]:
raise ValueError("Parameter {} has incompatible dimension: {}. "
Expand All @@ -358,7 +365,7 @@ def run(self, ts, overwrite=False, **kwds):
if len(val) == 1:
pdict[key] = val.repeat(nloops, axis=0)

# Update parameter objects
# Update non-parallel parameter objects
for key, val in kwds.items():

if key in self._rvext:
Expand All @@ -376,6 +383,7 @@ def run(self, ts, overwrite=False, **kwds):
if self.rvi:
self.handle_date_defaults(ts)

# Loop over parallel parameters
procs = []
for self.psim in range(nloops):
for key, val in pdict.items():
Expand Down Expand Up @@ -443,9 +451,8 @@ def _merge_output(self, files, name):
if name.endswith('.nc') and not isinstance(self, raven.models.RavenMultiModel):
ds = [xr.open_dataset(fn) for fn in files]
try:
# We aggregate along the params dimensions.
# Hard-coded for now.
out = xr.concat(ds, 'params', data_vars='different')
# We aggregate along the pdim dimensions.
out = xr.concat(ds, self._pdim, data_vars='different')
out.to_netcdf(outfn)
return outfn
except (ValueError, KeyError):
Expand All @@ -468,16 +475,13 @@ def parse_errors(self):
out += f.read_text()
return out

def _assign_files(self, fns, variables):
def _assign_files(self, fns):
"""Find for each variable the file storing it's data and the name of the netCDF variable.
Parameters
----------
fns : sequence
Paths to netCDF files.
variables : sequence
Names of the variables to look for. Specify their CF standard name, a dictionary of
alternative names will be used for the lookup.
Returns
-------
Expand All @@ -488,22 +492,40 @@ def _assign_files(self, fns, variables):
"""
files = {}
var_names = {}
dimensions = {}
shape = {}

for fn in fns:
if '.nc' in fn.suffix:
with xr.open_dataset(fn) as ds:
for var in variables:
for alt_name in self._variable_names[var]:
for var, alt_names in self._variable_names.items():
if var not in self.rvt.keys():
continue
for alt_name in alt_names:
if alt_name in ds.data_vars:
files[var] = fn
var_names[var + '_var'] = alt_name
dimensions[var] = ds[alt_name].dims
shape[var] = ds[alt_name].shape
break

for var in variables:
if var not in files.keys():
for var in self._variable_names.keys():
if var in self.rvt.keys() and var not in files.keys():
raise ValueError("{} not found in files.".format(var))

return files, var_names
sdims = set(dimensions.values())
if len(sdims) == 0:
dims = None
if len(sdims) == 1:
dims = sdims.pop()
if len(sdims) > 1:
raise AttributeError("All forcing variables should have the same dimensions.")

sh = set(shape.values())
if len(sh) > 1:
raise AttributeError("All forcing variables should have the same shape.")

return files, var_names, dims

def _get_output(self, pattern, path):
"""Match actual output files to known expected files.
Expand Down
26 changes: 16 additions & 10 deletions raven/models/emulators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from raven.models import Raven, Ostrich
from pathlib import Path
from collections import namedtuple
from .rv import RV, RVI, Ost
from .rv import RV, RVT, RVI, Ost


class GR4JCN(Raven):
Expand All @@ -12,7 +12,7 @@ class GR4JCN(Raven):
params = namedtuple('GR4JParams', ('GR4J_X1', 'GR4J_X2', 'GR4J_X3', 'GR4J_X4', 'CEMANEIGE_X1', 'CEMANEIGE_X2'))

rvp = RV(params=params(None, None, None, None, None, None))
rvt = RV(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvt = RVT(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvi = RVI()
rvh = RV(name=None, area=None, elevation=None, latitude=None, longitude=None)
rvd = RV(one_minus_CEMANEIGE_X2=None, GR4J_X1_hlf=None)
Expand Down Expand Up @@ -46,7 +46,7 @@ class MOHYSE(Raven):

rvp = RV(params=params(*((None, ) * 8)))
rvh = RV(name=None, area=None, elevation=None, latitude=None, longitude=None, hrus=hrus(None, None))
rvt = RV(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvt = RVT(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvi = RVI()
rvd = RV(par_rezi_x10=None)

Expand Down Expand Up @@ -83,7 +83,7 @@ class HMETS(GR4JCN):
'BASEFLOW_COEFF_2', 'TOPSOIL', 'PHREATIC'))

rvp = RV(params=params(*((None,) * len(params._fields))))
rvt = RV(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvt = RVT(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None, water_volume_transport_in_river_channel=None)
rvi = RVI()
rvd = RV(TOPSOIL_m=None, PHREATIC_m=None, SUM_MELT_FACTOR=None, SUM_SNOW_SWI=None, TOPSOIL_hlf=None,
PHREATIC_hlf=None)
Expand Down Expand Up @@ -124,7 +124,7 @@ class HBVEC(GR4JCN):

rvp = RV(params=params(*((None,) * len(params._fields))))
rvd = RV(one_plus_par_x15=None, par_x11_half=None, mae=mae, mat=mat)
rvt = RV(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None,
rvt = RVT(pr=None, prsn=None, tasmin=None, tasmax=None, evspsbl=None,
water_volume_transport_in_river_channel=None)
rvh = RV(name=None, area=None, elevation=None, latitude=None, longitude=None)

Expand All @@ -134,9 +134,9 @@ def derived_parameters(self):
self.rvd['one_plus_par_x15'] = self.rvp.params.par_x15 + 1.0
self.rvd['par_x11_half'] = self.rvp.params.par_x11 / 2.0

tasmax = xr.open_dataset(self.rvt.tasmax)[self.rvd.tasmax_var]
tasmin = xr.open_dataset(self.rvt.tasmin)[self.rvd.tasmin_var]
evap = xr.open_dataset(self.rvt.evspsbl)[self.rvd.evspsbl_var]
tasmax = xr.open_dataset(self.rvt.tasmax)[self.rvt.tasmax_var]
tasmin = xr.open_dataset(self.rvt.tasmin)[self.rvt.tasmin_var]
evap = xr.open_dataset(self.rvt.evspsbl)[self.rvt.evspsbl_var]

tas = (tasmax + tasmin) / 2.
self.rvd.mat = self.mat(*tas.groupby('time.month').mean().values)
Expand All @@ -158,9 +158,15 @@ class HBVEC_OST(Ostrich, HBVEC):
)

def derived_parameters(self):
"""Derived parameters are computed by Ostrich."""
pass
import xarray as xr

tasmax = xr.open_dataset(self.rvt.tasmax)[self.rvt.tasmax_var]
tasmin = xr.open_dataset(self.rvt.tasmin)[self.rvt.tasmin_var]
evap = xr.open_dataset(self.rvt.evspsbl)[self.rvt.evspsbl_var]

tas = (tasmax + tasmin) / 2.
self.rvd.mat = self.mat(*tas.groupby('time.month').mean().values)
self.rvd.mae = self.mae(*evap.groupby('time.month').mean().values)

def get_model(name):
"""Return the corresponding Raven emulated model instance.
Expand Down
26 changes: 16 additions & 10 deletions raven/models/ostrich-gr4j-cemaneige/model/raven-gr4j-cemaneige.rvt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#########################################################################
:FileType rvt ASCII Raven 2.8.2
:WrittenBy Juliane Mai & James Craig
#########################################################################
:FileType rvt ASCII Raven 2.8.2
:WrittenBy Juliane Mai & James Craig
:CreationDate Sep 2018
#
# Emulation of GR4J simulation of Salmon River near Prince George
# Emulation of GR4J simulation of Salmon River near Prince George
#------------------------------------------------------------------------

# meteorological forcings
Expand All @@ -15,35 +15,40 @@
:ReadFromNetCDF
:FileNameNC {pr}
:VarNameNC {pr_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndData
:Data SNOWFALL mm/d
:ReadFromNetCDF
:FileNameNC {prsn}
:VarNameNC {prsn_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndData
:Data TEMP_MIN deg_C
:ReadFromNetCDF
:FileNameNC {tasmin}
:VarNameNC {tasmin_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndData
:Data TEMP_MAX deg_C
:ReadFromNetCDF
:FileNameNC {tasmax}
:VarNameNC {tasmax_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndData
:Data PET mm/d
:ReadFromNetCDF
:FileNameNC {evspsbl}
:VarNameNC {evspsbl_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndData
:EndGauge
Expand All @@ -53,6 +58,7 @@
:ReadFromNetCDF
:FileNameNC {water_volume_transport_in_river_channel}
:VarNameNC {water_volume_transport_in_river_channel_var}
:DimNamesNC time
:DimNamesNC {nc_dimensions}
:StationIdx {nc_index}
:EndReadFromNetCDF
:EndObservationData
Loading

0 comments on commit 2e42a7d

Please sign in to comment.