diff --git a/aiida_flexpart/calculations/flexpart_cosmo.py b/aiida_flexpart/calculations/flexpart_cosmo.py index 494d192..9cb507a 100644 --- a/aiida_flexpart/calculations/flexpart_cosmo.py +++ b/aiida_flexpart/calculations/flexpart_cosmo.py @@ -51,7 +51,7 @@ def define(cls, spec): help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.' ) spec.input('species', valid_type=orm.RemoteData, required=True) - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=True, help='Path to the folder containing the meteorological input data.') spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO') @@ -101,16 +101,13 @@ def prepare_for_submission(self, folder): # pylint: disable=too-many-locals needed by the calculation. :return: `aiida.common.datastructures.CalcInfo` instance """ + meteo_string_list = ['./','./'] + for path in self.inputs.meteo_path: + meteo_string_list.append(f'{path}{os.sep}') + meteo_string_list.append(f'{path}/AVAILABLE') - meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path()) codeinfo = datastructures.CodeInfo() - codeinfo.cmdline_params = [ - './', # Folder containing the inputs. - './', # Folder containing the outputs. - f'{meteo_path}{os.sep}', - str(meteo_path / 'AVAILABLE'), - # File that lists all the individual input files that are available and assigns them a date - ] + codeinfo.cmdline_params = meteo_string_list codeinfo.code_uuid = self.inputs.code.uuid codeinfo.stdout_name = self.metadata.options.output_filename codeinfo.withmpi = self.inputs.metadata.options.withmpi diff --git a/aiida_flexpart/calculations/flexpart_ifs.py b/aiida_flexpart/calculations/flexpart_ifs.py index 2cb0896..93b89df 100644 --- a/aiida_flexpart/calculations/flexpart_ifs.py +++ b/aiida_flexpart/calculations/flexpart_ifs.py @@ -55,7 +55,7 @@ def define(cls, spec): spec.input('species', valid_type=orm.RemoteData, required=True) spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO') - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=True, help='Path to the folder containing the meteorological input data.') spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) spec.outputs.dynamic = True @@ -98,15 +98,13 @@ def _deal_with_time(cls, command_dict): def prepare_for_submission(self, folder): - meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path()) + meteo_string_list = ['./','./'] + for path in self.inputs.meteo_path: + meteo_string_list.append(f'{path}{os.sep}') + meteo_string_list.append(f'{path}/AVAILABLE') + codeinfo = datastructures.CodeInfo() - codeinfo.cmdline_params = [ - './', # Folder containing the inputs. - './', # Folder containing the outputs. - f'{meteo_path}{os.sep}', - str(meteo_path / 'AVAILABLE'), - # File that lists all the individual input files that are available and assigns them a date - ] + codeinfo.cmdline_params = meteo_string_list codeinfo.code_uuid = self.inputs.code.uuid codeinfo.stdout_name = self.metadata.options.output_filename codeinfo.withmpi = self.inputs.metadata.options.withmpi diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index bd8e855..3d90b30 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -48,8 +48,8 @@ def define(cls, spec): # Basic Inputs spec.input('simulation_dates', valid_type=orm.List, help='A list of the starting dates of the simulations') - spec.input('model', valid_type=orm.Str, required=True) - spec.input('model_offline', valid_type=orm.Str, required=True) + spec.input('model', valid_type=orm.List, required=True) + spec.input('model_offline', valid_type=orm.List, required=True) spec.input('offline_integration_time', valid_type=orm.Int) spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours') spec.input("parent_calc_folder", @@ -70,9 +70,9 @@ def define(cls, spec): help='Meteo models input params.') spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False, help='Meteo models input params.') - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=False, help='Path to the folder containing the meteorological input data.') - spec.input('meteo_path_offline', valid_type=orm.RemoteData, + spec.input('meteo_path_offline', valid_type=orm.List, required=False, help='Path to the folder containing the meteorological input data.') spec.input('gribdir', valid_type=orm.Str, required=True) @@ -124,16 +124,22 @@ def condition(self): return True if self.ctx.index < len(self.ctx.simulation_dates) else False def run_cosmo(self): - return True if self.inputs.model in cosmo_models else False + if all(mod in cosmo_models for mod in self.inputs.model) and self.inputs.model: + return True + else: + return False def run_ifs(self): - return True if self.inputs.model in ECMWF_models else False + if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model: + return True + else: + return False def run_offline(self): - if self.inputs.model_offline in ECMWF_models and self.inputs.model != '': + if all(mod in ECMWF_models for mod in self.inputs.model_offline) and self.inputs.model and self.inputs.model_offline: self.ctx.index-=1 return True - elif self.inputs.model_offline in ECMWF_models and self.inputs.model == '': + elif all(mod in ECMWF_models for mod in self.inputs.model_offline) and not self.inputs.model: return True return False @@ -141,7 +147,6 @@ def setup(self): """Prepare a simulation.""" self.report(f'starting setup') - self.report(f'model: {self.inputs.model.value}, model_offline: {self.inputs.model_offline.value}') self.ctx.index = 0 self.ctx.simulation_dates = self.inputs.simulation_dates @@ -169,23 +174,32 @@ def prepare_meteo_folder_ifs(self): self.ctx.command.get_dict()["release_duration"], self.ctx.command.get_dict()["simulation_direction"]) - self.report(f'prepare meteo from {s_date} to {e_date}') - - results, node = launch_shell_job( - self.inputs.check_meteo_ifs_code, - arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', - nodes={ - 'sdate': orm.Str(s_date), - 'edate': orm.Str(e_date), - 'gribdir': self.inputs.gribdir, - 'model' : self.inputs.model_offline - }) - - if node.is_finished_ok: - self.report('meteo files transferred successfully') + self.report(f'preparing meteo from {s_date} to {e_date}') + + if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model: + model_list = self.inputs.model + else: + model_list = self.inputs.model_offline + + node_list=[] + for mod in model_list: + self.report(f'transfering {mod} meteo') + results, node = launch_shell_job( + self.inputs.check_meteo_ifs_code, + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model' : orm.Str(mod) + }) + node_list.append(node) + + if all(node.is_finished_ok for node in node_list): + self.report('ALL meteo OK') return True else: - self.report('failed to transfer meteo files') + self.report('FAILED to transfer meteo') self.ctx.index += 1 return False @@ -195,30 +209,34 @@ def prepare_meteo_folder_cosmo(self): self.ctx.command.get_dict()["release_duration"], self.ctx.command.get_dict()["simulation_direction"]) - self.report(f'prepare meteo from {s_date} to {e_date}') - - results, node = launch_shell_job( - self.inputs.check_meteo_cosmo_code, - arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', - nodes={ - 'sdate': orm.Str(s_date), - 'edate': orm.Str(e_date), - 'gribdir': self.inputs.gribdir, - 'model': self.inputs.model - }) - - if node.is_finished_ok: - self.report('meteo files transferred successfully') + self.report(f'preparing meteo from {s_date} to {e_date}') + + node_list=[] + for mod in self.inputs.model: + self.report(f'transfering {mod} meteo') + results, node = launch_shell_job( + self.inputs.check_meteo_cosmo_code, + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model': orm.Str(mod) + }) + node_list.append(node) + + if all(node.is_finished_ok for node in node_list): + self.report('ALL meteo OK') return True else: - self.report('failed to transfer meteo files') + self.report('FAILED to transfer meteo') self.ctx.index += 1 return False def run_cosmo_simulation(self): """Run calculations for equation of state.""" - self.report('starting flexpart cosmo') + self.report(f'starting flexpart cosmo {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartCalculation.get_builder() builder.code = self.inputs.fcosmo_code @@ -257,7 +275,7 @@ def run_cosmo_simulation(self): def run_ifs_simulation(self): """Run calculations for equation of state.""" # Set up calculation. - self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}') + self.report(f'running flexpart ifs for {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartIfsCalculation.get_builder() builder.code = self.inputs.fifs_code @@ -265,11 +283,11 @@ def run_ifs_simulation(self): new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - if self.inputs.model_offline in ECMWF_models: + if self.inputs.model_offline[0] in ECMWF_models: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 new_dict['dumped_particle_data'] = True - if self.inputs.model != '': + if self.inputs.model: self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder self.report(f'starting from: {self.ctx.parent_calc_folder}') else: diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 66b29db..30d0ac5 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -62,20 +62,20 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-09']) - model = 'cosmo7' - model_offline = 'IFS_GL_05' + simulation_dates = simulation_dates_parser(['2020-09-01']) + model = ['cosmo7'] + model_offline = [] username='lfernand' outgrid_main = 'Europe' outgrid_nest = 'Switzerland' integration_time = 24 - integration_time_offline = 48 + integration_time_offline = 24 users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' #list of locations and/or groups of locations - list_locations = ['TEST_32'] + list_locations = ['group-name-1'] # Links to the remote files/folders. @@ -104,8 +104,8 @@ def test_run(flexpart_code): builder = workflow.get_builder() builder.fcosmo_code = flexpart_code builder.fifs_code = orm.load_code('flexpart_ifs@daint') - builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct') - builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct') + builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') + builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') #basic settings builder.simulation_dates = simulation_dates @@ -113,28 +113,24 @@ def test_run(flexpart_code): builder.offline_integration_time = orm.Int(integration_time_offline) #meteo realted settings - builder.model = orm.Str(model) - builder.model_offline = orm.Str(model_offline) + builder.model = orm.List(model) + builder.model_offline = orm.List(model_offline) - if model is not None: - meteo_path = orm.RemoteData( - remote_path=scratch_address+model+'/', - computer = flexpart_code.computer) + if model: + meteo_path = orm.List([scratch_address+mod for mod in model]) builder.meteo_path = meteo_path builder.meteo_inputs = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - model, - ])[model]) + model[-1], + ])[model[-1]]) - if model_offline is not None: - meteo_path_offline = orm.RemoteData( - remote_path = scratch_address+model_offline, - computer=flexpart_code.computer) + if model_offline: + meteo_path_offline = orm.List([scratch_address+mod for mod in model_offline]) builder.meteo_path_offline = meteo_path_offline builder.meteo_inputs_offline = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - model_offline, - ])[model_offline]) + model_offline[-1], + ])[model_offline[-1]]) builder.gribdir=orm.Str(scratch_address) @@ -182,6 +178,11 @@ def test_run(flexpart_code): 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } + + #change wall time for cosom and ifs in seconds + builder.flexpart.metadata.options.max_wallclock_seconds = 1800 + #builder.flexpartifs.metadata.options.max_wallclock_seconds = 2700 + engine.run(builder) diff --git a/examples/inputs/location_groups.yaml b/examples/inputs/location_groups.yaml index f0b5b76..7223075 100644 --- a/examples/inputs/location_groups.yaml +++ b/examples/inputs/location_groups.yaml @@ -1,2 +1,5 @@ -group-name1: - - location1 \ No newline at end of file +group-name-1: + - TEST_32 +group-name-2: + - TEST_32 + - TEST_200 \ No newline at end of file