Skip to content

Commit

Permalink
multiple models implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
LucR31 committed Nov 15, 2023
1 parent bf03388 commit 3e7c67b
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 84 deletions.
15 changes: 6 additions & 9 deletions aiida_flexpart/calculations/flexpart_cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def define(cls, spec):
help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.'
)
spec.input('species', valid_type=orm.RemoteData, required=True)
spec.input('meteo_path', valid_type=orm.RemoteData,
spec.input('meteo_path', valid_type=orm.List,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO')
Expand Down Expand Up @@ -101,16 +101,13 @@ def prepare_for_submission(self, folder): # pylint: disable=too-many-locals
needed by the calculation.
:return: `aiida.common.datastructures.CalcInfo` instance
"""
meteo_string_list = ['./','./']
for path in self.inputs.meteo_path:
meteo_string_list.append(f'{path}{os.sep}')
meteo_string_list.append(f'{path}/AVAILABLE')

meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path())
codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = [
'./', # Folder containing the inputs.
'./', # Folder containing the outputs.
f'{meteo_path}{os.sep}',
str(meteo_path / 'AVAILABLE'),
# File that lists all the individual input files that are available and assigns them a date
]
codeinfo.cmdline_params = meteo_string_list
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi
Expand Down
16 changes: 7 additions & 9 deletions aiida_flexpart/calculations/flexpart_ifs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def define(cls, spec):
spec.input('species', valid_type=orm.RemoteData, required=True)
spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO')

spec.input('meteo_path', valid_type=orm.RemoteData,
spec.input('meteo_path', valid_type=orm.List,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
spec.outputs.dynamic = True
Expand Down Expand Up @@ -98,15 +98,13 @@ def _deal_with_time(cls, command_dict):

def prepare_for_submission(self, folder):

meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path())
meteo_string_list = ['./','./']
for path in self.inputs.meteo_path:
meteo_string_list.append(f'{path}{os.sep}')
meteo_string_list.append(f'{path}/AVAILABLE')

codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = [
'./', # Folder containing the inputs.
'./', # Folder containing the outputs.
f'{meteo_path}{os.sep}',
str(meteo_path / 'AVAILABLE'),
# File that lists all the individual input files that are available and assigns them a date
]
codeinfo.cmdline_params = meteo_string_list
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi
Expand Down
104 changes: 61 additions & 43 deletions aiida_flexpart/workflows/multi_dates_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ def define(cls, spec):
# Basic Inputs
spec.input('simulation_dates', valid_type=orm.List,
help='A list of the starting dates of the simulations')
spec.input('model', valid_type=orm.Str, required=True)
spec.input('model_offline', valid_type=orm.Str, required=True)
spec.input('model', valid_type=orm.List, required=True)
spec.input('model_offline', valid_type=orm.List, required=True)
spec.input('offline_integration_time', valid_type=orm.Int)
spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours')
spec.input("parent_calc_folder",
Expand All @@ -70,9 +70,9 @@ def define(cls, spec):
help='Meteo models input params.')
spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False,
help='Meteo models input params.')
spec.input('meteo_path', valid_type=orm.RemoteData,
spec.input('meteo_path', valid_type=orm.List,
required=False, help='Path to the folder containing the meteorological input data.')
spec.input('meteo_path_offline', valid_type=orm.RemoteData,
spec.input('meteo_path_offline', valid_type=orm.List,
required=False, help='Path to the folder containing the meteorological input data.')
spec.input('gribdir', valid_type=orm.Str, required=True)

Expand Down Expand Up @@ -124,24 +124,29 @@ def condition(self):
return True if self.ctx.index < len(self.ctx.simulation_dates) else False

def run_cosmo(self):
return True if self.inputs.model in cosmo_models else False
if all(mod in cosmo_models for mod in self.inputs.model) and self.inputs.model:
return True
else:
return False

def run_ifs(self):
return True if self.inputs.model in ECMWF_models else False
if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model:
return True
else:
return False

def run_offline(self):
if self.inputs.model_offline in ECMWF_models and self.inputs.model != '':
if all(mod in ECMWF_models for mod in self.inputs.model_offline) and self.inputs.model and self.inputs.model_offline:
self.ctx.index-=1
return True
elif self.inputs.model_offline in ECMWF_models and self.inputs.model == '':
elif all(mod in ECMWF_models for mod in self.inputs.model_offline) and not self.inputs.model:
return True
return False

def setup(self):
"""Prepare a simulation."""

self.report(f'starting setup')
self.report(f'model: {self.inputs.model.value}, model_offline: {self.inputs.model_offline.value}')

self.ctx.index = 0
self.ctx.simulation_dates = self.inputs.simulation_dates
Expand Down Expand Up @@ -169,23 +174,32 @@ def prepare_meteo_folder_ifs(self):
self.ctx.command.get_dict()["release_duration"],
self.ctx.command.get_dict()["simulation_direction"])

self.report(f'prepare meteo from {s_date} to {e_date}')

results, node = launch_shell_job(
self.inputs.check_meteo_ifs_code,
arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a',
nodes={
'sdate': orm.Str(s_date),
'edate': orm.Str(e_date),
'gribdir': self.inputs.gribdir,
'model' : self.inputs.model_offline
})

if node.is_finished_ok:
self.report('meteo files transferred successfully')
self.report(f'preparing meteo from {s_date} to {e_date}')

if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model:
model_list = self.inputs.model
else:
model_list = self.inputs.model_offline

node_list=[]
for mod in model_list:
self.report(f'transfering {mod} meteo')
results, node = launch_shell_job(
self.inputs.check_meteo_ifs_code,
arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a',
nodes={
'sdate': orm.Str(s_date),
'edate': orm.Str(e_date),
'gribdir': self.inputs.gribdir,
'model' : orm.Str(mod)
})
node_list.append(node)

if all(node.is_finished_ok for node in node_list):
self.report('ALL meteo OK')
return True
else:
self.report('failed to transfer meteo files')
self.report('FAILED to transfer meteo')
self.ctx.index += 1
return False

Expand All @@ -195,30 +209,34 @@ def prepare_meteo_folder_cosmo(self):
self.ctx.command.get_dict()["release_duration"],
self.ctx.command.get_dict()["simulation_direction"])

self.report(f'prepare meteo from {s_date} to {e_date}')

results, node = launch_shell_job(
self.inputs.check_meteo_cosmo_code,
arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a',
nodes={
'sdate': orm.Str(s_date),
'edate': orm.Str(e_date),
'gribdir': self.inputs.gribdir,
'model': self.inputs.model
})

if node.is_finished_ok:
self.report('meteo files transferred successfully')
self.report(f'preparing meteo from {s_date} to {e_date}')

node_list=[]
for mod in self.inputs.model:
self.report(f'transfering {mod} meteo')
results, node = launch_shell_job(
self.inputs.check_meteo_cosmo_code,
arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a',
nodes={
'sdate': orm.Str(s_date),
'edate': orm.Str(e_date),
'gribdir': self.inputs.gribdir,
'model': orm.Str(mod)
})
node_list.append(node)

if all(node.is_finished_ok for node in node_list):
self.report('ALL meteo OK')
return True
else:
self.report('failed to transfer meteo files')
self.report('FAILED to transfer meteo')
self.ctx.index += 1
return False

def run_cosmo_simulation(self):
"""Run calculations for equation of state."""

self.report('starting flexpart cosmo')
self.report(f'starting flexpart cosmo {self.ctx.simulation_dates[self.ctx.index]}')

builder = FlexpartCalculation.get_builder()
builder.code = self.inputs.fcosmo_code
Expand Down Expand Up @@ -257,19 +275,19 @@ def run_cosmo_simulation(self):
def run_ifs_simulation(self):
"""Run calculations for equation of state."""
# Set up calculation.
self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}')
self.report(f'running flexpart ifs for {self.ctx.simulation_dates[self.ctx.index]}')
builder = FlexpartIfsCalculation.get_builder()
builder.code = self.inputs.fifs_code

#changes in the command file
new_dict = self.ctx.command.get_dict()
new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index]

if self.inputs.model_offline in ECMWF_models:
if self.inputs.model_offline[0] in ECMWF_models:
new_dict['age_class'] = self.ctx.offline_integration_time * 3600
new_dict['dumped_particle_data'] = True

if self.inputs.model != '':
if self.inputs.model:
self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder
self.report(f'starting from: {self.ctx.parent_calc_folder}')
else:
Expand Down
43 changes: 22 additions & 21 deletions examples/example_workflow_combi.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,20 @@ def simulation_dates_parser(date_list: list) -> list:
def test_run(flexpart_code):
"""Run workflow."""

simulation_dates = simulation_dates_parser(['2021-01-09'])
model = 'cosmo7'
model_offline = 'IFS_GL_05'
simulation_dates = simulation_dates_parser(['2020-09-01'])
model = ['cosmo7']
model_offline = []
username='lfernand'
outgrid_main = 'Europe'
outgrid_nest = 'Switzerland'
integration_time = 24
integration_time_offline = 48
integration_time_offline = 24

users_address=f'/users/{username}/resources/flexpart/'
scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/'

#list of locations and/or groups of locations
list_locations = ['TEST_32']
list_locations = ['group-name-1']


# Links to the remote files/folders.
Expand Down Expand Up @@ -104,37 +104,33 @@ def test_run(flexpart_code):
builder = workflow.get_builder()
builder.fcosmo_code = flexpart_code
builder.fifs_code = orm.load_code('flexpart_ifs@daint')
builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct')
builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct')
builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106')
builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106')

#basic settings
builder.simulation_dates = simulation_dates
builder.integration_time = orm.Int(integration_time)
builder.offline_integration_time = orm.Int(integration_time_offline)

#meteo realted settings
builder.model = orm.Str(model)
builder.model_offline = orm.Str(model_offline)
builder.model = orm.List(model)
builder.model_offline = orm.List(model_offline)

if model is not None:
meteo_path = orm.RemoteData(
remote_path=scratch_address+model+'/',
computer = flexpart_code.computer)
if model:
meteo_path = orm.List([scratch_address+mod for mod in model])
builder.meteo_path = meteo_path
builder.meteo_inputs = orm.Dict(
dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[
model,
])[model])
model[-1],
])[model[-1]])

if model_offline is not None:
meteo_path_offline = orm.RemoteData(
remote_path = scratch_address+model_offline,
computer=flexpart_code.computer)
if model_offline:
meteo_path_offline = orm.List([scratch_address+mod for mod in model_offline])
builder.meteo_path_offline = meteo_path_offline
builder.meteo_inputs_offline = orm.Dict(
dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[
model_offline,
])[model_offline])
model_offline[-1],
])[model_offline[-1]])

builder.gribdir=orm.Str(scratch_address)

Expand Down Expand Up @@ -182,6 +178,11 @@ def test_run(flexpart_code):
'target_base': f'/store/empa/em05/{username}/aiida_stash',
'stash_mode': StashMode.COPY.value,
}

#change wall time for cosom and ifs in seconds
builder.flexpart.metadata.options.max_wallclock_seconds = 1800
#builder.flexpartifs.metadata.options.max_wallclock_seconds = 2700

engine.run(builder)


Expand Down
7 changes: 5 additions & 2 deletions examples/inputs/location_groups.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
group-name1:
- location1
group-name-1:
- TEST_32
group-name-2:
- TEST_32
- TEST_200

0 comments on commit 3e7c67b

Please sign in to comment.