From 97077ba21b2555010d9c7e10ec4afb1769c520bc Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 20 Oct 2023 09:16:19 +0000 Subject: [PATCH 01/22] new flexpart ifs --- aiida_flexpart/calculations/flexpart_ifs.py | 186 ++++++++++++++++++ aiida_flexpart/parsers/flexpart_ifs.py | 61 ++++++ aiida_flexpart/templates/COMMAND_ifs.j2 | 190 +++++++++++++++++++ aiida_flexpart/templates/OUTGRID_NEST_ifs.j2 | 30 +++ aiida_flexpart/templates/OUTGRID_ifs.j2 | 35 ++++ examples/example_ifs.py | 114 +++++++++++ setup.json | 6 +- 7 files changed, 620 insertions(+), 2 deletions(-) create mode 100644 aiida_flexpart/calculations/flexpart_ifs.py create mode 100644 aiida_flexpart/parsers/flexpart_ifs.py create mode 100644 aiida_flexpart/templates/COMMAND_ifs.j2 create mode 100644 aiida_flexpart/templates/OUTGRID_NEST_ifs.j2 create mode 100644 aiida_flexpart/templates/OUTGRID_ifs.j2 create mode 100644 examples/example_ifs.py diff --git a/aiida_flexpart/calculations/flexpart_ifs.py b/aiida_flexpart/calculations/flexpart_ifs.py new file mode 100644 index 0000000..e491071 --- /dev/null +++ b/aiida_flexpart/calculations/flexpart_ifs.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +""" +Calculations provided by aiida_flexpart. + +Register calculations via the "aiida.calculations" entry point in setup.json. +""" +import os +import importlib +import datetime +import pathlib +import jinja2 + +from aiida import orm +from aiida.common import datastructures +from aiida.engine import CalcJob +from aiida_flexpart.utils import convert_input_to_namelist_entry + +from ..utils import fill_in_template_file + + +class FlexpartIfsCalculation(CalcJob): + """AiiDA calculation plugin wrapping the FLEXPART IFS executable.""" + @classmethod + def define(cls, spec): + """Define inputs and outputs of the calculation.""" + # yapf: disable + super().define(spec) + + # set default values for AiiDA options + spec.inputs['metadata']['options']['resources'].default = { + 'num_machines': 1, + 'num_mpiprocs_per_machine': 1, + } + + spec.input('metadata.options.max_wallclock_seconds', valid_type=int, default=1800) + spec.input('metadata.options.parser_name', valid_type=str, default='flexpart.ifs') + + spec.input( + "parent_calc_folder", + valid_type=orm.RemoteData, + required=False, + help="Working directory of a previously ran calculation to restart from." + ) + + # Model settings + spec.input_namespace('model_settings') + spec.input('model_settings.release_settings', valid_type=orm.Dict, required=True) + spec.input('model_settings.locations', valid_type=orm.Dict, required=True) + spec.input('model_settings.command', valid_type=orm.Dict, required=True) + + spec.input('outgrid', valid_type=orm.Dict, help='Input file for the Lagrangian particle dispersion model FLEXPART.') + spec.input('outgrid_nest', valid_type=orm.Dict, required=False, + help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.' + ) + spec.input('species', valid_type=orm.RemoteData, required=True) + spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO') + + spec.input('meteo_path', valid_type=orm.RemoteData, + required=True, help='Path to the folder containing the meteorological input data.') + spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) + spec.outputs.dynamic = True + + #exit codes + spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.') + + @classmethod + def _deal_with_time(cls, command_dict): + """Dealing with simulation times.""" + #initial values + simulation_beginning_date = datetime.datetime.strptime(command_dict.pop('simulation_date'),'%Y-%m-%d %H:%M:%S') + age_class_time = datetime.timedelta(seconds=command_dict.pop('age_class')) + release_chunk = datetime.timedelta(seconds=command_dict.pop('release_chunk')) + release_duration = datetime.timedelta(seconds=command_dict.pop('release_duration')) + + #releases start and end times + release_beginning_date=simulation_beginning_date + release_ending_date=release_beginning_date+release_duration + + if command_dict['simulation_direction']>0: #forward + simulation_ending_date=release_ending_date+age_class_time + else: #backward + simulation_ending_date=release_ending_date + simulation_beginning_date-=age_class_time + + command_dict['simulation_beginning_date'] = [ + f'{simulation_beginning_date:%Y%m%d}', + f'{simulation_beginning_date:%H%M%S}' + ] + command_dict['simulation_ending_date'] = [ + f'{simulation_ending_date:%Y%m%d}', + f'{simulation_ending_date:%H%M%S}' + ] + return { + 'beginning_date': release_beginning_date, + 'ending_date': release_ending_date, + 'chunk': release_chunk + } , age_class_time + + def prepare_for_submission(self, folder): + + meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path()) + codeinfo = datastructures.CodeInfo() + codeinfo.cmdline_params = [ + './', # Folder containing the inputs. + './', # Folder containing the outputs. + f'{meteo_path}{os.sep}', + str(meteo_path / 'AVAILABLE'), + # File that lists all the individual input files that are available and assigns them a date + ] + codeinfo.code_uuid = self.inputs.code.uuid + codeinfo.stdout_name = self.metadata.options.output_filename + codeinfo.withmpi = self.inputs.metadata.options.withmpi + + # Prepare a `CalcInfo` to be returned to the engine + calcinfo = datastructures.CalcInfo() + calcinfo.codes_info = [codeinfo] + + + command_dict = self.inputs.model_settings.command.get_dict() + + # Deal with simulation times. + release, age_class_time = self._deal_with_time(command_dict) + + # Fill in the releases file. + with folder.open('RELEASES', 'w') as infile: + time_chunks = [] + current_time = release['beginning_date'] + release['chunk'] + while current_time <= release['ending_date']: + time_chunks.append({ + 'begin': [f'{current_time-release["chunk"]:%Y%m%d}', f'{current_time-release["chunk"]:%H%M%S}'], + 'end': [f'{current_time:%Y%m%d}', f'{current_time:%H%M%S}'], + }) + current_time += release['chunk'] + + template = jinja2.Template(importlib.resources.read_text('aiida_flexpart.templates', 'RELEASES.j2')) + infile.write(template.render( + time_chunks=time_chunks, + locations=self.inputs.model_settings.locations.get_dict(), + release_settings=self.inputs.model_settings.release_settings.get_dict() + ) + ) + + # Fill in the AGECLASSES file. + fill_in_template_file(folder, 'AGECLASSES', int(age_class_time.total_seconds())) + + # Fill in the COMMAND file. + fill_in_template_file(folder, 'COMMAND_ifs', command_dict) + + # Fill in the OUTGRID_NEST file if the corresponding dictionary is present. + if 'outgrid_nest' in self.inputs: + command_dict['nested_output'] = True + fill_in_template_file(folder, 'OUTGRID_NEST_ifs', self.inputs.outgrid_nest.get_dict()) + else: + command_dict['nested_output'] = False + + # Fill in the OUTGRID file. + fill_in_template_file(folder, 'OUTGRID_ifs', self.inputs.outgrid.get_dict()) + + + calcinfo.remote_symlink_list = [] + calcinfo.remote_symlink_list.append(( + self.inputs.species.computer.uuid, + self.inputs.species.get_remote_path(), + 'SPECIES' + )) + + #_DEFAULT_PARENT_CALC_FLDR_NAME = "parent_calc/" + if "parent_calc_folder" in self.inputs: + calcinfo.remote_symlink_list.append(( + self.inputs.parent_calc_folder.computer.uuid, + self.inputs.parent_calc_folder.get_remote_path(), + self._DEFAULT_PARENT_CALC_FLDR_NAME)) + + + # Dealing with land_use input namespace. + for _, value in self.inputs.land_use.items(): + file_path = value.get_remote_path() + calcinfo.remote_symlink_list.append((value.computer.uuid, file_path, pathlib.Path(file_path).name)) + + calcinfo.retrieve_list = ['grid_time_*.nc', 'aiida.out'] + + return calcinfo + + + + \ No newline at end of file diff --git a/aiida_flexpart/parsers/flexpart_ifs.py b/aiida_flexpart/parsers/flexpart_ifs.py new file mode 100644 index 0000000..a18248a --- /dev/null +++ b/aiida_flexpart/parsers/flexpart_ifs.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +""" +Parsers provided by aiida_flexpart. + +Register parsers via the "aiida.parsers" entry point in setup.json. +""" +from aiida.engine import ExitCode +from aiida.parsers.parser import Parser +from aiida.plugins import CalculationFactory +from aiida.common import exceptions +from aiida.orm import SinglefileData + +FlexpartCalculation = CalculationFactory('flexpart.ifs') + + +class FlexpartCosmoParser(Parser): + """ + Parser class for parsing output of calculation. + """ + def __init__(self, node): + """ + Initialize Parser instance + + Checks that the ProcessNode being passed was produced by a FlexpartCalculation. + + :param node: ProcessNode of calculation + :param type node: :class:`aiida.orm.ProcessNode` + """ + super().__init__(node) + if not issubclass(node.process_class, FlexpartCalculation): + raise exceptions.ParsingError('Can only parse FlexpartCalculation') + + def parse(self, **kwargs): + """ + Parse outputs, store results in database. + + :returns: an exit code, if parsing fails (or nothing if parsing succeeds) + """ + output_filename = self.node.get_option('output_filename') + + # Check that folder content is as expected + files_retrieved = self.retrieved.list_object_names() + files_expected = [output_filename] + # Note: set(A) <= set(B) checks whether A is a subset of B + if not set(files_expected) <= set(files_retrieved): + self.logger.error("Found files '{}', expected to find '{}'".format( + files_retrieved, files_expected)) + return self.exit_codes.ERROR_MISSING_OUTPUT_FILES + + # check aiida.out content + with self.retrieved.open(output_filename, 'r') as handle: + content=handle.read() + if 'CONGRATULATIONS' not in content: + return ExitCode(1) + # add output file + self.logger.info("Parsing '{}'".format(output_filename)) + with self.retrieved.open(output_filename, 'rb') as handle: + output_node = SinglefileData(file=handle) + self.out('output_file', output_node) + + return ExitCode(0) diff --git a/aiida_flexpart/templates/COMMAND_ifs.j2 b/aiida_flexpart/templates/COMMAND_ifs.j2 new file mode 100644 index 0000000..31fb6c7 --- /dev/null +++ b/aiida_flexpart/templates/COMMAND_ifs.j2 @@ -0,0 +1,190 @@ +******************************************************************************** +* * +* Input file for the Lagrangian particle dispersion model FLEXPART * +* Please select your options * +* * +******************************************************************************** + +1. __ 3X, I2 + {{ data.simulation_direction }} + LDIRECT 1 FOR FORWARD SIMULATION, -1 FOR BACKWARD SIMULATION + +2. ________ ______ 3X, I8, 1X, I6 + {{ data.simulation_beginning_date[0] }} {{ data.simulation_beginning_date[1] }} + YYYYMMDD HHMISS BEGINNING DATE OF SIMULATION + +3. ________ ______ 3X, I8, 1X, I6 + {{ data.simulation_ending_date[0] }} {{ data.simulation_ending_date[1] }} + YYYYMMDD HHMISS ENDING DATE OF SIMULATION + +4. _____ 3X, I5 + {{ data.output_every_seconds }} + SSSSS OUTPUT EVERY SSSSS SECONDS + +5. _____ 3X, I5 + {{ data.time_average_of_output_seconds }} + SSSSS TIME AVERAGE OF OUTPUT (IN SSSSS SECONDS) + +6. _____ 3X, I5 + {{ data.sampling_rate_of_output }} + SSSSS SAMPLING RATE OF OUTPUT (IN SSSSS SECONDS) + +7. _________ 3X, I9 + {{ data.particle_splitting_time_constant }} + SSSSSSSSS TIME CONSTANT FOR PARTICLE SPLITTING (IN SECONDS) + +8. _____ 3X, I5 + {{ data.synchronisation_interval }} + SSSSS SYNCHRONISATION INTERVAL OF FLEXPART (IN SECONDS) + +9. ---.-- 4X, F6.4 + {{ data.smaller_than_tl_factor }} + CTL FACTOR, BY WHICH TIME STEP MUST BE SMALLER THAN TL + +10. --- 4X, I3 + {{ data.vertical_motion_time_decrease }} + IFINE DECREASE OF TIME STEP FOR VERTICAL MOTION BY FACTOR IFINE + +11. - 4X, I1 + {{ data.concentration_output }} + IOUT 1 CONCENTRATION (RESIDENCE TIME FOR BACKWARD RUNS) OUTPUT, 2 MIXING RATIO OUTPUT, 3 BOTH,4 PLUME TRAJECT., 5=1+4 + +12. - 4X, I1 + {{ data.particle_dump }} + IPOUT PARTICLE DUMP: 0 NO, 1 EVERY OUTPUT INTERVAL, 2 ONLY AT END, 4 WHEN LEAVING DOMAIN + +13. _ 4X, I1 + {{ 1 if data.subgrid_terrain_effect_parameterization else 0 }} + LSUBGRID SUBGRID TERRAIN EFFECT PARAMETERIZATION: 1 YES, 0 NO + +14. _ 4X, I1 + {{ data.convection_parametrization }} + LCONVECTION CONVECTION: 2 TIEDTKE, 1 EMANUEL, 0 NO + +15. _ 4X, I1 + {{ 1 if data.age_spectra else 0 }} + LAGESPECTRA AGE SPECTRA: 1 YES, 0 NO + +16. _ 4X, I1 + {{ 1 if data.dumped_particle_data else 0 }} + IPIN CONTINUE SIMULATION WITH DUMPED PARTICLE DATA: 1 YES, 0 NO + +17. _ 4X,I1 + {{ 1 if data.output_for_each_release else 0 }} + IOFR IOUTPUTFOREACHREL CREATE AN OUPUT FILE FOR EACH RELEASE LOCATION: 1 YES, 0 NO + +18. _ 4X, I1 + {{ 1 if data.calculate_fluxes else 0 }} + IFLUX CALCULATE FLUXES: 1 YES, 0 NO + +19. _ 4X, I1 + {{ data.domain_filling_trajectory }} + MDOMAINFILL DOMAIN-FILLING TRAJECTORY OPTION: 1 YES, 0 NO, 2 STRAT. O3 TRACER + +20. _ 4X, I1 + {{ data.concentration_units_at_source }} + IND_SOURCE 1=MASS UNIT , 2=MASS MIXING RATIO UNIT + +21. _ 4X, I1 + {{ data.concentration_units_at_receptor }} + IND_RECEPTOR 1=MASS UNIT , 2=MASS MIXING RATIO UNIT + +22. _ 4X, I1 + {{ 1 if data.quasilagrangian_mode_to_track_particles else 0 }} + MQUASILAG QUASILAGRANGIAN MODE TO TRACK INDIVIDUAL PARTICLES: 1 YES, 0 NO + +23. _ 4X, I1 + {{ 1 if data.nested_output else 0 }} + NESTED_OUTPUT SHALL NESTED OUTPUT BE USED? 1 YES, 0 NO + +24. _ 4X, I1 + {{ data.linit_cond }} + LINIT_COND INITIAL COND. FOR BW RUNS: 0=NO,1=MASS UNIT,2=MASS MIXING RATIO UNIT + + +1. Simulation direction, 1 for forward, -1 for backward in time + (consult Seibert and Frank, 2004 for backward runs) + +2. Beginning date and time of simulation. Must be given in format + YYYYMMDD HHMISS, where YYYY is YEAR, MM is MONTH, DD is DAY, HH is HOUR, + MI is MINUTE and SS is SECOND. Current version utilizes UTC. + +3. Ending date and time of simulation. Same format as 3. + +4. Average concentrations are calculated every SSSSS seconds. + +5. The average concentrations are time averages of SSSSS seconds + duration. If SSSSS is 0, instantaneous concentrations are outputted. + +6. The concentrations are sampled every SSSSS seconds to calculate the time + average concentration. This period must be shorter than the averaging time. + +7. Time constant for particle splitting. Particles are split into two + after SSSSS seconds, 2xSSSSS seconds, 4xSSSSS seconds, and so on. + +8. All processes are synchronized with this time interval (lsynctime). + Therefore, all other time constants must be multiples of this value. + Output interval and time average of output must be at least twice lsynctime. + +9. CTL must be >1 for time steps shorter than the Lagrangian time scale + If CTL<0, a purely random walk simulation is done + +10.IFINE=Reduction factor for time step used for vertical wind + +11.IOUT determines how the output shall be made: concentration + (ng/m3, Bq/m3), mixing ratio (pptv), or both, or plume trajectory mode, + or concentration + plume trajectory mode. + In plume trajectory mode, output is in the form of average trajectories. + +12.IPOUT determines whether particle positions are outputted (in addition + to the gridded concentrations or mixing ratios) or not. + 0=no output, 1 output every output interval, 2 only at end of the + simulation + +13.Switch on/off subgridscale terrain parameterization (increase of + mixing heights due to subgridscale orographic variations) + +14.Switch on/off the convection parameterization + +15.Switch on/off the calculation of age spectra: if yes, the file AGECLASSES + must be available + +16. If IPIN=1, a file "partposit_end" from a previous run must be available in + the output directory. Particle positions are read in and previous simulation + is continued. If IPIN=0, no particles from a previous run are used + +17. IF IOUTPUTFOREACHRELEASE is set to 1, one output field for each location + in the RLEASE file is created. For backward calculation this should be + set to 1. For forward calculation both possibilities are applicable. + +18. If IFLUX is set to 1, fluxes of each species through each of the output + boxes are calculated. Six fluxes, corresponding to northward, southward, + eastward, westward, upward and downward are calculated for each grid cell of + the output grid. The control surfaces are placed in the middle of each + output grid cell. If IFLUX is set to 0, no fluxes are determined. + +19. If MDOMAINFILL is set to 1, the first box specified in file RELEASES is used + as the domain where domain-filling trajectory calculations are to be done. + Particles are initialized uniformly distributed (according to the air mass + distribution) in that domain at the beginning of the simulation, and are + created at the boundaries throughout the simulation period. + +20. IND_SOURCE switches between different units for concentrations at the source + NOTE that in backward simulations the release of computational particles + takes place at the "receptor" and the sampling of particles at the "source". + 1=mass units (for bwd-runs = concentration) + 2=mass mixing ratio units +21. IND_RECEPTOR switches between different units for concentrations at the receptor + 1=mass units (concentrations) + 2=mass mixing ratio units + +22. MQUASILAG indicates whether particles shall be numbered consecutively (1) or + with their release location number (0). The first option allows tracking of + individual particles using the partposit output files + +23. NESTED_OUTPUT decides whether model output shall be made also for a nested + output field (normally with higher resolution) + +24. LINIT_COND determines whether, for backward runs only, the sensitivity to initial + conditions shall be calculated and written to output files + 0=no output, 1 or 2 determines in which units the initial conditions are provided. \ No newline at end of file diff --git a/aiida_flexpart/templates/OUTGRID_NEST_ifs.j2 b/aiida_flexpart/templates/OUTGRID_NEST_ifs.j2 new file mode 100644 index 0000000..8dca87b --- /dev/null +++ b/aiida_flexpart/templates/OUTGRID_NEST_ifs.j2 @@ -0,0 +1,30 @@ +******************************************************************************** +* * +* Input file for the Lagrangian particle dispersion model FLEXPART * +* Please specify your NESTED output grid * +* Dominik Brunner: This version is for output on COSMO2 domain * +******************************************************************************** + +2. ------.---- 4X,F11.4 + {{ data.longitude_of_output_grid }} LONGITUDE OF LOWER LEFT CORNER OF OUTPUT GRID + OUTLONLEFT (left boundary of the first grid cell - not its centre) + +3. ------.---- 4X,F11.4 + {{ data.latitude_of_output_grid }} LATITUDE OF LOWER LEFT CORNER OF OUTPUT GRID + OUTLATLOWER (lower boundary of the first grid cell - not its centre) + +4. ----- 4X,I5 + {{ data.number_of_grid_points_x }} n NUMBER OF GRID POINTS IN X DIRECTION (= No. of cells + 1) + NUMXGRID + +5. ----- 4X,I5 + {{ data.number_of_grid_points_y }} NUMBER OF GRID POINTS IN Y DIRECTION (= No. of cells + 1) + NUMYGRID + +6. ------.--- 4X,F10.3 + {{ data.grid_distance_x }} GRID DISTANCE IN X DIRECTION + DXOUTLON + +7. ------.--- 4X,F10.3 + {{ data.grid_distance_y }} GRID DISTANCE IN Y DIRECTION + DYOUTLAT \ No newline at end of file diff --git a/aiida_flexpart/templates/OUTGRID_ifs.j2 b/aiida_flexpart/templates/OUTGRID_ifs.j2 new file mode 100644 index 0000000..8ce5ea3 --- /dev/null +++ b/aiida_flexpart/templates/OUTGRID_ifs.j2 @@ -0,0 +1,35 @@ +******************************************************************************** +* * +* Input file for the Lagrangian particle dispersion model FLEXPART * +* Please specify your output grid * +* * +******************************************************************************** + +2. ------.---- 4X,F11.4 + {{ data.longitude_of_output_grid }} LONGITUDE OF LOWER LEFT CORNER OF OUTPUT GRID + OUTLONLEFT (left boundary of the first grid cell - not its centre) + +3. ------.---- 4X,F11.4 + {{ data.latitude_of_output_grid }} LATITUDE OF LOWER LEFT CORNER OF OUTPUT GRID + OUTLATLOWER (lower boundary of the first grid cell - not its centre) + +4. ----- 4X,I5 + {{ data.number_of_grid_points_x }} NUMBER OF GRID POINTS IN X DIRECTION (= No. of cells + 1) + NUMXGRID + +5. ----- 4X,I5 + {{ data.number_of_grid_points_y }} NUMBER OF GRID POINTS IN Y DIRECTION (= No. of cells + 1) + NUMYGRID + +6. ------.--- 4X,F10.3 + {{ data.grid_distance_x }} GRID DISTANCE IN X DIRECTION + DXOUTLON + +7. ------.--- 4X,F10.3 + {{ data.grid_distance_y }} GRID DISTANCE IN Y DIRECTION + DYOUTLAT +{% for level in data.heights_of_levels %} +{{ loop.index + 7 }}. -----.- 4X, F7.1 + {{ level }} + LEVEL {{ loop.index }} HEIGHT OF LEVEL (UPPER BOUNDARY) +{% endfor %} \ No newline at end of file diff --git a/examples/example_ifs.py b/examples/example_ifs.py new file mode 100644 index 0000000..b0b7a23 --- /dev/null +++ b/examples/example_ifs.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import pathlib +import click +import yaml +from aiida import cmdline, engine, orm +from aiida.plugins import CalculationFactory +from aiida.common.datastructures import StashMode + +INPUT_DIR = pathlib.Path(__file__).resolve().parent / 'input_files' + +def read_yaml_data(data_filename: str, names=None) -> dict: + """Read in a YAML data file as a dictionary""" + data_path = pathlib.Path(data_filename) + with data_path.open('r', encoding='utf-8') as fp: + yaml_data = yaml.safe_load(fp) + + return {key: value + for key, value in yaml_data.items() + if key in names} if names else yaml_data + +def test_run(flexpart_code): + """Run a calculation on the localhost computer. + + Uses test helpers to create AiiDA Code on the fly. + """ + user_name="lfernand" + # Prepare input parameters + + command = orm.Dict( + dict=read_yaml_data('inputs/command.yml')) + + outgrid = orm.Dict( + dict=read_yaml_data('inputs/outgrid.yaml', names=['Europe'])['Europe']) + + outgrid_nest = orm.Dict( + dict=read_yaml_data('inputs/outgrid_nest.yaml', names=['Europe'])['Europe']) + + release_settings = orm.Dict( + dict=read_yaml_data('inputs/release.yml')) + + locations = orm.Dict( + dict=read_yaml_data('inputs/locations.yaml', names=['TEST_32', + 'TEST_200'])) + + # Links to the remote files/folders. + glc = orm.RemoteData(remote_path=f'/users/{user_name}/resources/flexpart/GLC2000', + computer=flexpart_code.computer) + species = orm.RemoteData( + remote_path=f'/users/{user_name}/resources/flexpart/SPECIES', + computer=flexpart_code.computer) + surfdata = orm.RemoteData( + remote_path=f'/users/{user_name}/resources/flexpart/surfdata.t', + computer=flexpart_code.computer) + surfdepo = orm.RemoteData( + remote_path=f'/users/{user_name}/resources/flexpart/surfdepo.t', + computer=flexpart_code.computer) + meteo_path = orm.RemoteData( + remote_path=f'/scratch/snx3000/{user_name}/FLEXPART_input/', + computer=flexpart_code.computer) + + # Set up calculation. + calc = CalculationFactory('flexpart.ifs') + builder = calc.get_builder() + builder.code = flexpart_code + builder.model_settings = { + 'release_settings': + release_settings, + 'locations': + locations, + 'command': + command, + } + builder.outgrid = outgrid + builder.outgrid_nest = outgrid_nest + builder.species = species + builder.meteo_path = meteo_path + builder.land_use = { + 'glc': glc, + 'surfdata': surfdata, + 'surfdepo': surfdepo, + } + + builder.metadata.description = 'Test job submission with the aiida_flexpart plugin' + builder.metadata.options.stash = { + 'source_list': ['aiida.out', 'grid_time_*.nc'], + 'target_base': f'/store/empa/em05/{user_name}/aiida_stash', + 'stash_mode': StashMode.COPY.value, + } + + # builder.metadata.dry_run = True + # builder.metadata.store_provenance = False + engine.run(builder) + # result = engine.submit(builder) # submit to aiida daemon + + +@click.command() +@cmdline.utils.decorators.with_dbenv() +@cmdline.params.options.CODE() +def cli(code): + """Run example. + + Example usage: $ ./example_01.py --code diff@localhost + + Alternative (creates diff@localhost-test code): $ ./example_01.py + + Help: $ ./example_01.py --help + """ + test_run(code) + + +if __name__ == '__main__': + cli() # pylint: disable=no-value-for-parameter \ No newline at end of file diff --git a/setup.json b/setup.json index 5e81226..cb33cfa 100644 --- a/setup.json +++ b/setup.json @@ -15,10 +15,12 @@ "version": "0.1.0a0", "entry_points": { "aiida.calculations": [ - "flexpart.cosmo = aiida_flexpart.calculations.flexpart_cosmo:FlexpartCosmoCalculation" + "flexpart.cosmo = aiida_flexpart.calculations.flexpart_cosmo:FlexpartCosmoCalculation", + "flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation" ], "aiida.parsers": [ - "flexpart.cosmo = aiida_flexpart.parsers.flexpart_cosmo:FlexpartCosmoParser" + "flexpart.cosmo = aiida_flexpart.parsers.flexpart_cosmo:FlexpartCosmoParser", + "flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser" ], "aiida.workflows": [ "flexpart.multi_dates = aiida_flexpart.workflows.multi_dates_workflow:FlexpartMultipleDatesWorkflow" From bf5166f6d09fecf74d787a4de11e8b315c655735 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 20 Oct 2023 09:43:28 +0000 Subject: [PATCH 02/22] new flexpart ifs, minor changes --- aiida_flexpart/parsers/flexpart_ifs.py | 2 +- examples/example_ifs.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/aiida_flexpart/parsers/flexpart_ifs.py b/aiida_flexpart/parsers/flexpart_ifs.py index a18248a..a3f83ea 100644 --- a/aiida_flexpart/parsers/flexpart_ifs.py +++ b/aiida_flexpart/parsers/flexpart_ifs.py @@ -13,7 +13,7 @@ FlexpartCalculation = CalculationFactory('flexpart.ifs') -class FlexpartCosmoParser(Parser): +class FlexpartIfsParser(Parser): """ Parser class for parsing output of calculation. """ diff --git a/examples/example_ifs.py b/examples/example_ifs.py index b0b7a23..9a32fc6 100644 --- a/examples/example_ifs.py +++ b/examples/example_ifs.py @@ -29,7 +29,7 @@ def test_run(flexpart_code): # Prepare input parameters command = orm.Dict( - dict=read_yaml_data('inputs/command.yml')) + dict=read_yaml_data('inputs/command.yaml')) outgrid = orm.Dict( dict=read_yaml_data('inputs/outgrid.yaml', names=['Europe'])['Europe']) @@ -38,14 +38,14 @@ def test_run(flexpart_code): dict=read_yaml_data('inputs/outgrid_nest.yaml', names=['Europe'])['Europe']) release_settings = orm.Dict( - dict=read_yaml_data('inputs/release.yml')) + dict=read_yaml_data('inputs/release.yaml')) locations = orm.Dict( dict=read_yaml_data('inputs/locations.yaml', names=['TEST_32', 'TEST_200'])) # Links to the remote files/folders. - glc = orm.RemoteData(remote_path=f'/users/{user_name}/resources/flexpart/GLC2000', + glc = orm.RemoteData(remote_path=f'/users/{user_name}/resources/flexpart/IGBP_int1.dat', computer=flexpart_code.computer) species = orm.RemoteData( remote_path=f'/users/{user_name}/resources/flexpart/SPECIES', @@ -57,7 +57,7 @@ def test_run(flexpart_code): remote_path=f'/users/{user_name}/resources/flexpart/surfdepo.t', computer=flexpart_code.computer) meteo_path = orm.RemoteData( - remote_path=f'/scratch/snx3000/{user_name}/FLEXPART_input/', + remote_path=f'/scratch/snx3000/{user_name}/FLEXPART_input/IFS_GL_05', computer=flexpart_code.computer) # Set up calculation. From 15a79e50869876727ade9badd478b101b07a6f3a Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 20 Oct 2023 13:48:57 +0000 Subject: [PATCH 03/22] added previous calc folder for ifs --- aiida_flexpart/calculations/flexpart_ifs.py | 13 +++++++++---- aiida_flexpart/utils.py | 8 +++++++- examples/example_ifs.py | 8 ++++++++ 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/aiida_flexpart/calculations/flexpart_ifs.py b/aiida_flexpart/calculations/flexpart_ifs.py index e491071..106006f 100644 --- a/aiida_flexpart/calculations/flexpart_ifs.py +++ b/aiida_flexpart/calculations/flexpart_ifs.py @@ -164,12 +164,17 @@ def prepare_for_submission(self, folder): 'SPECIES' )) - #_DEFAULT_PARENT_CALC_FLDR_NAME = "parent_calc/" if "parent_calc_folder" in self.inputs: + computer_uuid = self.inputs.parent_calc_folder.computer.uuid + remote_path = self.inputs.parent_calc_folder.get_remote_path() calcinfo.remote_symlink_list.append(( - self.inputs.parent_calc_folder.computer.uuid, - self.inputs.parent_calc_folder.get_remote_path(), - self._DEFAULT_PARENT_CALC_FLDR_NAME)) + computer_uuid, + remote_path+'/header', + 'header_previous')) + calcinfo.remote_symlink_list.append(( + computer_uuid, + remote_path+'/partposit_inst', + 'partposit_previous')) # Dealing with land_use input namespace. diff --git a/aiida_flexpart/utils.py b/aiida_flexpart/utils.py index 5371920..ef04241 100644 --- a/aiida_flexpart/utils.py +++ b/aiida_flexpart/utils.py @@ -176,7 +176,13 @@ def convert_input_to_namelist_entry(key, val, mapping=None): def fill_in_template_file(folder, fname, data): """Create an input file based on the standard templates.""" - with folder.open(fname, 'w') as infile: + + if 'ifs' in fname: + fname_=fname[:-4] + else: + fname_=fname + + with folder.open(fname_, 'w') as infile: template = jinja2.Template( importlib.resources.read_text('aiida_flexpart.templates', fname + '.j2')) diff --git a/examples/example_ifs.py b/examples/example_ifs.py index 9a32fc6..eb128ae 100644 --- a/examples/example_ifs.py +++ b/examples/example_ifs.py @@ -59,6 +59,11 @@ def test_run(flexpart_code): meteo_path = orm.RemoteData( remote_path=f'/scratch/snx3000/{user_name}/FLEXPART_input/IFS_GL_05', computer=flexpart_code.computer) + + #change path accordingly + previous_cosmo_calc = orm.RemoteData( + remote_path=f'/scratch/snx3000/{user_name}/aiida/9e/2c/5188-d166-4b20-a89f-b2559a83a6b1', + computer=flexpart_code.computer) # Set up calculation. calc = CalculationFactory('flexpart.ifs') @@ -82,6 +87,9 @@ def test_run(flexpart_code): 'surfdepo': surfdepo, } + #uncomment to use previous cosmo calculation + #builder.parent_calc_folder = previous_cosmo_calc + builder.metadata.description = 'Test job submission with the aiida_flexpart plugin' builder.metadata.options.stash = { 'source_list': ['aiida.out', 'grid_time_*.nc'], From 44841121425132d84d50789f4c07373f935ed38a Mon Sep 17 00:00:00 2001 From: LucR31 Date: Mon, 23 Oct 2023 13:06:30 +0000 Subject: [PATCH 04/22] combined ifs and cosmo workflow --- .../workflows/multi_dates_workflow.py | 103 ++++++++++- examples/example_workflow_combi.py | 166 ++++++++++++++++++ 2 files changed, 262 insertions(+), 7 deletions(-) create mode 100644 examples/example_workflow_combi.py diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index ac0644d..7504408 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -6,6 +6,7 @@ import datetime FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo') +FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs') def get_simulation_period(date, age_class_time, @@ -33,12 +34,27 @@ class FlexpartMultipleDatesWorkflow(engine.WorkChain): def define(cls, spec): """Specify inputs and outputs.""" super().define(spec) - # Basic Inputs + + #codes spec.input('fcosmo_code', valid_type=orm.AbstractCode) spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode) + spec.input('fifs_code', valid_type=orm.AbstractCode) + spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode) + + # Basic Inputs spec.input('simulation_dates', valid_type=orm.List, help='A list of the starting dates of the simulations') spec.input('model', valid_type=orm.Str, required=True) + spec.input('offline_integration_time', valid_type=orm.Int) + spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours') + spec.input("parent_calc_folder", + valid_type=orm.RemoteData, + required=False, + help="Working directory of a previously ran calculation to restart from." + ) + + #outline variables + spec.input('run_cosmo',valid_type=orm.Bool, required=True) #model settings spec.input('input_phy', valid_type=orm.Dict) @@ -52,11 +68,11 @@ def define(cls, spec): help='Meteo models input params.') spec.input('meteo_path', valid_type=orm.RemoteData, required=True, help='Path to the folder containing the meteorological input data.') + spec.input('meteo_path_ifs', valid_type=orm.RemoteData, + required=True, help='Path to the folder containing the meteorological input data.') spec.input('gribdir', valid_type=orm.Str, required=True) #others - spec.input('integration_time', valid_type=orm.Int, - help='Integration time in hours') spec.input('outgrid', valid_type=orm.Dict) spec.input('outgrid_nest', valid_type=orm.Dict, required=False) spec.input('species', valid_type=orm.RemoteData, required=True) @@ -65,6 +81,7 @@ def define(cls, spec): required=False, dynamic=True, help='#TODO') + spec.input_namespace('land_use_ifs', valid_type=orm.RemoteData, required=False, dynamic=True) spec.expose_inputs(FlexpartCalculation, include=['metadata.options'], @@ -77,15 +94,24 @@ def define(cls, spec): spec.outline( cls.setup, while_(cls.condition)( - if_(cls.prepare_meteo_folder)( - cls.run_simulation + if_(cls.run_cosmo)( + if_(cls.prepare_meteo_folder_cosmo)( + cls.run_cosmo_simulation ) + ).else_( + if_(cls.prepare_meteo_folder_ifs)( + cls.run_ifs_simulation + ) + ) ), cls.results, ) def condition(self): return True if self.ctx.index < len(self.ctx.simulation_dates) else False + + def run_cosmo(self): + return True if self.inputs.run_cosmo else False def setup(self): """Prepare a simulation.""" @@ -108,7 +134,25 @@ def setup(self): self.ctx.species = self.inputs.species self.ctx.land_use = self.inputs.land_use - def prepare_meteo_folder(self): + def prepare_meteo_folder_ifs(self): + e_date, s_date = get_simulation_period(self.ctx.simulation_dates[self.ctx.index], + self.inputs.integration_time.value * 3600, + self.ctx.command.get_dict()["release_duration"], + self.ctx.command.get_dict()["simulation_direction"]) + + self.report(f'prepare meteo from {s_date} to {e_date}') + + results, node = launch_shell_job( + self.inputs.check_meteo_ifs_code, + arguments=' -s {sdate} -e {edate} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + }) + + return node.is_finished_ok + + def prepare_meteo_folder_cosmo(self): e_date, s_date = get_simulation_period(self.ctx.simulation_dates[self.ctx.index], self.inputs.integration_time.value * 3600, self.ctx.command.get_dict()["release_duration"], @@ -128,7 +172,7 @@ def prepare_meteo_folder(self): return node.is_finished_ok - def run_simulation(self): + def run_cosmo_simulation(self): """Run calculations for equation of state.""" self.report('starting flexpart cosmo') @@ -161,6 +205,51 @@ def run_simulation(self): builder.metadata.options = self.inputs.flexpart.metadata.options + # Ask the workflow to continue when the results are ready and store them in the context + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) + + self.ctx.index += 1 + + def run_ifs_simulation(self): + """Run calculations for equation of state.""" + # Set up calculation. + self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}') + builder = FlexpartIfsCalculation.get_builder() + builder.code = self.inputs.ifs_code + + #changes in the command file + new_dict = self.ctx.command.get_dict() + new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] + new_dict['dumped_particle_data'] = True + if self.ctx.offline_integration_time>0: + new_dict['age_class'] = self.ctx.offline_integration_time * 3600 + else: + new_dict['age_class'] = self.inputs.integration_time * 3600 + new_dict.update(self.inputs.meteo_inputs) + new_dict['convection_parametrization'] = 1 + + #model settings + builder.model_settings = { + 'release_settings': self.ctx.release_settings, + 'locations': self.ctx.locations, + 'command': orm.Dict(dict=new_dict), + } + + builder.outgrid = self.ctx.outgrid + builder.outgrid_nest = self.ctx.outgrid_nest + builder.species = self.ctx.species + builder.land_use = self.ctx.land_use_ifs + builder.meteo_path = self.inputs.meteo_path_ifs + + #remote folder from cosmo calc + if 'parent_calc_folder' in self.inputs: + builder.parent_calc_folder = self.ctx.parent_calc_folder + + # Walltime, memory, and resources. + builder.metadata.description = 'Test workflow to submit a flexpart calculation' + builder.metadata.options = self.inputs.flexpart.metadata.options + # Ask the workflow to continue when the results are ready and store them in the context running = self.submit(builder) self.to_context(calculations=engine.append_(running)) diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py new file mode 100644 index 0000000..a8087b8 --- /dev/null +++ b/examples/example_workflow_combi.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Run a multi dates workflow.""" + +import pathlib +import datetime +import click +import yaml +from aiida import orm, plugins, engine, cmdline +from aiida.common.datastructures import StashMode + + +def read_yaml_data(data_filename: str, names=None) -> dict: + """Read in a YAML data file as a dictionary""" + data_path = pathlib.Path(data_filename) + with data_path.open('r', encoding='utf-8') as fp: + yaml_data = yaml.safe_load(fp) + + return {key: value + for key, value in yaml_data.items() + if key in names} if names else yaml_data + + +def simulation_dates_parser(date_list: list) -> list: + """ + Parse a range of dates and returns a list of date strings. + + Examples: + 2021-01-02--2021-01-10 -> [2021-01-02 00:00:00, 2021-01-02 00:00:00, ..., 2021-01-10 00:00:00] + 2021-01-02, 2021-01-10 -> [2021-01-02 00:00:00, 2021-01-10 00:00:00] + 2021-01-02 -> [2021-01-02 00:00:00,] + """ + dates = [] + for date_string in date_list: + if ',' in date_string: + dates += [ + date.strip() + ' 00:00:00' for date in date_string.split(',') + ] + elif '--' in date_string: + date_start, date_end = list( + map(lambda date: datetime.datetime.strptime(date, '%Y-%m-%d'), + date_string.split('--'))) + dates += [ + date.strftime('%Y-%m-%d 00:00:00') for date in [ + date_start + datetime.timedelta(days=x) + for x in range(0, (date_end - date_start).days + 1) + ] + ] + else: + dates += [date_string.strip() + ' 00:00:00'] + + return orm.List(list=dates) + + +def test_run(flexpart_code): + """Run workflow.""" + + simulation_dates = simulation_dates_parser(['2021-01-02']) + username='lfernand' + users_address=f'/users/{username}/resources/flexpart/' + scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' + + + # Links to the remote files/folders. + glc = orm.RemoteData(remote_path = users_address+'GLC2000', + computer=flexpart_code.computer) + glc_ifs = orm.RemoteData(remote_path = users_address+'IGBP_int1.dat', + computer=flexpart_code.computer) + species = orm.RemoteData( + remote_path = users_address+'SPECIES', + computer=flexpart_code.computer) + surfdata = orm.RemoteData( + remote_path = users_address+'surfdata.t', + computer=flexpart_code.computer) + surfdepo = orm.RemoteData( + remote_path = users_address+'surfdepo.t', + computer=flexpart_code.computer) + meteo_path = orm.RemoteData( + remote_path=scratch_address+'cosmo7/', + computer = flexpart_code.computer) + meteo_path_ifs = orm.RemoteData( + remote_path = scratch_address+'IFS_GL_05', + computer=flexpart_code.computer) + + #builder starts + workflow = plugins.WorkflowFactory('flexpart.multi_dates') + builder = workflow.get_builder() + builder.fcosmo_code = flexpart_code + builder.fifs_code = orm.load_code('flexpart_ifs@daint') + builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') + builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') + + #basic settings + builder.simulation_dates = simulation_dates + builder.integration_time = orm.Int(24) + builder.run_cosmo = orm.Bool(False) + builder.offline_integration_time = orm.Int(0) + + #meteo realted settings + builder.model=orm.Str('cosmo7') + builder.meteo_path = meteo_path + builder.meteo_path_ifs = meteo_path_ifs + builder.gribdir=orm.Str(scratch_address) + builder.meteo_inputs = orm.Dict( + dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ + 'cosmo7', + ])['cosmo7']) + + #model settings + builder.command = orm.Dict( + dict=read_yaml_data('inputs/command.yaml')) #simulation date will be overwritten + builder.input_phy = orm.Dict( + dict=read_yaml_data('inputs/input_phy.yaml')) + builder.locations = orm.Dict( + dict=read_yaml_data('inputs/locations.yaml', names=[ + 'TEST_32', + ])) + builder.release_settings = orm.Dict( + dict=read_yaml_data('inputs/release.yaml')) + + #other + builder.outgrid = orm.Dict( + dict=read_yaml_data('inputs/outgrid.yaml', names=[ + 'Europe', + ])['Europe']) + builder.outgrid_nest = orm.Dict(dict=read_yaml_data( + 'inputs/outgrid_nest.yaml', names=[ + 'Europe', + ])['Europe']) + builder.species = species + builder.land_use = { + 'glc': glc, + 'surfdata': surfdata, + 'surfdepo': surfdepo, + } + builder.land_use_ifs = { + 'glc': glc_ifs, + 'surfdata': surfdata, + 'surfdepo': surfdepo, + } + + builder.flexpart.metadata.options.stash = { + 'source_list': ['aiida.out', 'grid_time_*.nc'], + 'target_base': f'/store/empa/em05/{username}/aiida_stash', + 'stash_mode': StashMode.COPY.value, + } + engine.run(builder) + + +@click.command() +@cmdline.utils.decorators.with_dbenv() +@cmdline.params.options.CODE() +def cli(code): + """Run example. + + Example usage: $ ./example_01.py --code diff@localhost + + Alternative (creates diff@localhost-test code): $ ./example_01.py + + Help: $ ./example_01.py --help + """ + test_run(code) + + +if __name__ == '__main__': + cli() # pylint: disable=no-value-for-parameter \ No newline at end of file From 9cc83854e1ab23133b276d9e1b0aa941320e45c3 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 27 Oct 2023 13:35:02 +0000 Subject: [PATCH 05/22] combined workflow --- .../workflows/multi_dates_workflow.py | 28 +++++++++++++++---- examples/example_ifs.py | 1 + examples/example_workflow_combi.py | 7 +++-- examples/inputs/command.yaml | 2 +- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 7504408..36e6415 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -86,6 +86,9 @@ def define(cls, spec): spec.expose_inputs(FlexpartCalculation, include=['metadata.options'], namespace='flexpart') + spec.expose_inputs(FlexpartIfsCalculation, + include=['metadata.options'], + namespace='flexpart') # Outputs #spec.output('output_file', valid_type=orm.SinglefileData) @@ -121,6 +124,7 @@ def setup(self): self.ctx.index = 0 self.ctx.simulation_dates = self.inputs.simulation_dates self.ctx.integration_time = self.inputs.integration_time + self.ctx.offline_integration_time = self.inputs.offline_integration_time #model settings self.ctx.release_settings = self.inputs.release_settings @@ -150,7 +154,13 @@ def prepare_meteo_folder_ifs(self): 'edate': orm.Str(e_date), }) - return node.is_finished_ok + if node.is_finished_ok: + self.report('meteo files transferred successfully') + return True + else: + self.report('failed to transfer meteo files') + self.ctx.index += 1 + return False def prepare_meteo_folder_cosmo(self): e_date, s_date = get_simulation_period(self.ctx.simulation_dates[self.ctx.index], @@ -170,7 +180,13 @@ def prepare_meteo_folder_cosmo(self): 'model': self.inputs.model }) - return node.is_finished_ok + if node.is_finished_ok: + self.report('meteo files transferred successfully') + return True + else: + self.report('failed to transfer meteo files') + self.ctx.index += 1 + return False def run_cosmo_simulation(self): """Run calculations for equation of state.""" @@ -216,14 +232,14 @@ def run_ifs_simulation(self): # Set up calculation. self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartIfsCalculation.get_builder() - builder.code = self.inputs.ifs_code + builder.code = self.inputs.fifs_code #changes in the command file new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - new_dict['dumped_particle_data'] = True if self.ctx.offline_integration_time>0: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 + new_dict['dumped_particle_data'] = True else: new_dict['age_class'] = self.inputs.integration_time * 3600 new_dict.update(self.inputs.meteo_inputs) @@ -239,12 +255,12 @@ def run_ifs_simulation(self): builder.outgrid = self.ctx.outgrid builder.outgrid_nest = self.ctx.outgrid_nest builder.species = self.ctx.species - builder.land_use = self.ctx.land_use_ifs + builder.land_use = self.inputs.land_use_ifs builder.meteo_path = self.inputs.meteo_path_ifs #remote folder from cosmo calc if 'parent_calc_folder' in self.inputs: - builder.parent_calc_folder = self.ctx.parent_calc_folder + builder.parent_calc_folder = self.inputs.parent_calc_folder # Walltime, memory, and resources. builder.metadata.description = 'Test workflow to submit a flexpart calculation' diff --git a/examples/example_ifs.py b/examples/example_ifs.py index eb128ae..b759f85 100644 --- a/examples/example_ifs.py +++ b/examples/example_ifs.py @@ -96,6 +96,7 @@ def test_run(flexpart_code): 'target_base': f'/store/empa/em05/{user_name}/aiida_stash', 'stash_mode': StashMode.COPY.value, } + #builder.metadata.options.max_wallclock_seconds = 2000 # builder.metadata.dry_run = True # builder.metadata.store_provenance = False diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index a8087b8..c36aaba 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -55,7 +55,7 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-02']) + simulation_dates = simulation_dates_parser(['2021-01-03']) username='lfernand' users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' @@ -81,6 +81,9 @@ def test_run(flexpart_code): meteo_path_ifs = orm.RemoteData( remote_path = scratch_address+'IFS_GL_05', computer=flexpart_code.computer) + parent_folder = orm.RemoteData( + remote_path = scratch_address+'/', + computer=flexpart_code.computer) #builder starts workflow = plugins.WorkflowFactory('flexpart.multi_dates') @@ -138,7 +141,7 @@ def test_run(flexpart_code): 'surfdata': surfdata, 'surfdepo': surfdepo, } - + #builder.parent_calc_folder = parent_folder builder.flexpart.metadata.options.stash = { 'source_list': ['aiida.out', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', diff --git a/examples/inputs/command.yaml b/examples/inputs/command.yaml index a29c2ab..44f74df 100644 --- a/examples/inputs/command.yaml +++ b/examples/inputs/command.yaml @@ -15,7 +15,7 @@ concentration_output: 9 # Determines how the output shall be made: concentratio # In plume trajectory mode, output is in the form of average trajectories. particle_dump: 4 # Particle dump: 0 no, 1 every output interval, 2 only at end, 4 when leaving domain. subgrid_terrain_effect_parameterization: True # Include ubgrid terrain effect parameterization. -convection_parametrization: 2 # Convection: 2 tiedtke, 1 emanuel, 0 no. +convection_parametrization: 1 # Convection: 2 tiedtke, 1 emanuel, 0 no. age_spectra: True # Switch on/off the calculation of age spectra: if yes, the file AGECLASSES must be available. dumped_particle_data: False # Continue simulation with dumped particle data. output_for_each_release: True # Create an ouput file for each release location. From 3be10fef8afbd902465b4655144e14656e6fb088 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 27 Oct 2023 14:02:41 +0000 Subject: [PATCH 06/22] minor change --- aiida_flexpart/workflows/multi_dates_workflow.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 36e6415..1cc251e 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -139,8 +139,11 @@ def setup(self): self.ctx.land_use = self.inputs.land_use def prepare_meteo_folder_ifs(self): + age_class_ = self.inputs.integration_time.value * 3600 + if self.ctx.offline_integration_time>0: + age_class_ = self.inputs.offline_integration_time.value * 3600 e_date, s_date = get_simulation_period(self.ctx.simulation_dates[self.ctx.index], - self.inputs.integration_time.value * 3600, + age_class_, self.ctx.command.get_dict()["release_duration"], self.ctx.command.get_dict()["simulation_direction"]) From 94868e89594b43ddeff5286aa79d3ad39bbbed65 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 27 Oct 2023 14:33:42 +0000 Subject: [PATCH 07/22] output_file problem fixed --- aiida_flexpart/parsers/flexpart_cosmo.py | 2 ++ aiida_flexpart/parsers/flexpart_ifs.py | 4 +++- aiida_flexpart/workflows/multi_dates_workflow.py | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/aiida_flexpart/parsers/flexpart_cosmo.py b/aiida_flexpart/parsers/flexpart_cosmo.py index c9a4653..6310c5b 100644 --- a/aiida_flexpart/parsers/flexpart_cosmo.py +++ b/aiida_flexpart/parsers/flexpart_cosmo.py @@ -49,8 +49,10 @@ def parse(self, **kwargs): # check aiida.out content with self.retrieved.open(output_filename, 'r') as handle: + output_node = SinglefileData(file=handle) content=handle.read() if 'CONGRATULATIONS' not in content: + self.out('output_file', output_node) return ExitCode(1) # add output file self.logger.info("Parsing '{}'".format(output_filename)) diff --git a/aiida_flexpart/parsers/flexpart_ifs.py b/aiida_flexpart/parsers/flexpart_ifs.py index a3f83ea..71e7fd0 100644 --- a/aiida_flexpart/parsers/flexpart_ifs.py +++ b/aiida_flexpart/parsers/flexpart_ifs.py @@ -49,8 +49,10 @@ def parse(self, **kwargs): # check aiida.out content with self.retrieved.open(output_filename, 'r') as handle: - content=handle.read() + content = handle.read() + output_node = SinglefileData(file=handle) if 'CONGRATULATIONS' not in content: + self.out('output_file', output_node) return ExitCode(1) # add output file self.logger.info("Parsing '{}'".format(output_filename)) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 1cc251e..86940c1 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -88,7 +88,7 @@ def define(cls, spec): namespace='flexpart') spec.expose_inputs(FlexpartIfsCalculation, include=['metadata.options'], - namespace='flexpart') + namespace='flexpartifs') # Outputs #spec.output('output_file', valid_type=orm.SinglefileData) @@ -267,7 +267,7 @@ def run_ifs_simulation(self): # Walltime, memory, and resources. builder.metadata.description = 'Test workflow to submit a flexpart calculation' - builder.metadata.options = self.inputs.flexpart.metadata.options + builder.metadata.options = self.inputs.flexpartifs.metadata.options # Ask the workflow to continue when the results are ready and store them in the context running = self.submit(builder) From 649878a518c63f671ff586816dde2a96e6a82388 Mon Sep 17 00:00:00 2001 From: ayanu Date: Fri, 3 Nov 2023 13:52:01 +0000 Subject: [PATCH 08/22] Add various input meteo settings --- examples/inputs/meteo_inputs.yaml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/examples/inputs/meteo_inputs.yaml b/examples/inputs/meteo_inputs.yaml index 18c8376..a228063 100644 --- a/examples/inputs/meteo_inputs.yaml +++ b/examples/inputs/meteo_inputs.yaml @@ -6,3 +6,23 @@ cosmo1: sampling_rate_of_output: 10 synchronisation_interval: 10 convection_parametrization: 0 +kenda1: + sampling_rate_of_output: 10 + synchronisation_interval: 10 + convection_parametrization: 0 +IFS_GL_1: + sampling_rate_of_output: 600 + synchronisation_interval: 600 + convection_parametrization: 1 +IFS_GL_05: + sampling_rate_of_output: 300 + synchronisation_interval: 300 + convection_parametrization: 1 +IFS_EU_02: + sampling_rate_of_output: 180 + synchronisation_interval: 180 + convection_parametrization: 1 +IFS_EU_01: + sampling_rate_of_output: 120 + synchronisation_interval: 120 + convection_parametrization: 1 From dcc9fdf43dbe7b96b910dff5ff41df3a9ac98523 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Mon, 6 Nov 2023 14:02:10 +0000 Subject: [PATCH 09/22] updated combined workflow --- .../workflows/multi_dates_workflow.py | 65 +++++++++++++------ examples/example_workflow_combi.py | 44 ++++++++----- 2 files changed, 74 insertions(+), 35 deletions(-) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 86940c1..38dac5e 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -8,6 +8,10 @@ FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo') FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs') +#possible models +cosmo_models = ['cosmo7', 'cosmo1', 'kenda1'] +ECMWF_models = ['IFS_GL_05', 'IFS_GL_1', 'IFS_EU_02', 'IFS_EU_01'] + def get_simulation_period(date, age_class_time, release_duration, @@ -45,6 +49,7 @@ def define(cls, spec): spec.input('simulation_dates', valid_type=orm.List, help='A list of the starting dates of the simulations') spec.input('model', valid_type=orm.Str, required=True) + spec.input('model_offline', valid_type=orm.Str, required=True) spec.input('offline_integration_time', valid_type=orm.Int) spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours') spec.input("parent_calc_folder", @@ -53,9 +58,6 @@ def define(cls, spec): help="Working directory of a previously ran calculation to restart from." ) - #outline variables - spec.input('run_cosmo',valid_type=orm.Bool, required=True) - #model settings spec.input('input_phy', valid_type=orm.Dict) spec.input('command', valid_type=orm.Dict) @@ -66,10 +68,12 @@ def define(cls, spec): #meteo related inputs spec.input('meteo_inputs', valid_type=orm.Dict, help='Meteo models input params.') + spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False, + help='Meteo models input params.') spec.input('meteo_path', valid_type=orm.RemoteData, required=True, help='Path to the folder containing the meteorological input data.') - spec.input('meteo_path_ifs', valid_type=orm.RemoteData, - required=True, help='Path to the folder containing the meteorological input data.') + spec.input('meteo_path_offline', valid_type=orm.RemoteData, + required=False, help='Path to the folder containing the meteorological input data.') spec.input('gribdir', valid_type=orm.Str, required=True) #others @@ -101,7 +105,13 @@ def define(cls, spec): if_(cls.prepare_meteo_folder_cosmo)( cls.run_cosmo_simulation ) - ).else_( + ), + if_(cls.run_ifs)( + if_(cls.prepare_meteo_folder_ifs)( + cls.run_ifs_simulation + ) + ), + if_(cls.run_offline)( if_(cls.prepare_meteo_folder_ifs)( cls.run_ifs_simulation ) @@ -114,7 +124,18 @@ def condition(self): return True if self.ctx.index < len(self.ctx.simulation_dates) else False def run_cosmo(self): - return True if self.inputs.run_cosmo else False + return True if self.inputs.model in cosmo_models else False + + def run_ifs(self): + return True if self.inputs.model in ECMWF_models else False + + def run_offline(self): + if self.inputs.model_offline in ECMWF_models and self.inputs.model is not None: + self.ctx.index-=1 + return True + elif self.inputs.model_offline in ECMWF_models and self.inputs.model is None: + return True + return False def setup(self): """Prepare a simulation.""" @@ -227,7 +248,7 @@ def run_cosmo_simulation(self): # Ask the workflow to continue when the results are ready and store them in the context running = self.submit(builder) self.to_context(calculations=engine.append_(running)) - + self.ctx.index += 1 def run_ifs_simulation(self): @@ -236,17 +257,28 @@ def run_ifs_simulation(self): self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartIfsCalculation.get_builder() builder.code = self.inputs.fifs_code - + #changes in the command file new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - if self.ctx.offline_integration_time>0: + + if self.inputs.model_offline in ECMWF_models: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 new_dict['dumped_particle_data'] = True + + if self.inputs.model is not None: + self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder + self.report(f'starting from: {self.ctx.parent_calc_folder}') + else: + self.ctx.parent_calc_folder = self.inputs.parent_calc_folder + + builder.meteo_path = self.inputs.meteo_path_offline + builder.parent_calc_folder = self.ctx.parent_calc_folder + new_dict.update(self.inputs.meteo_inputs_offline) else: new_dict['age_class'] = self.inputs.integration_time * 3600 - new_dict.update(self.inputs.meteo_inputs) - new_dict['convection_parametrization'] = 1 + builder.meteo_path = self.inputs.meteo_path + new_dict.update(self.inputs.meteo_inputs) #model settings builder.model_settings = { @@ -254,17 +286,12 @@ def run_ifs_simulation(self): 'locations': self.ctx.locations, 'command': orm.Dict(dict=new_dict), } - + builder.outgrid = self.ctx.outgrid builder.outgrid_nest = self.ctx.outgrid_nest builder.species = self.ctx.species builder.land_use = self.inputs.land_use_ifs - builder.meteo_path = self.inputs.meteo_path_ifs - - #remote folder from cosmo calc - if 'parent_calc_folder' in self.inputs: - builder.parent_calc_folder = self.inputs.parent_calc_folder - + # Walltime, memory, and resources. builder.metadata.description = 'Test workflow to submit a flexpart calculation' builder.metadata.options = self.inputs.flexpartifs.metadata.options diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index c36aaba..b6e0a7e 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -55,7 +55,9 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-03']) + simulation_dates = simulation_dates_parser(['2021-01-07, 2021-01-08']) + model = 'cosmo7' + model_offline = 'IFS_GL_05' username='lfernand' users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' @@ -75,14 +77,9 @@ def test_run(flexpart_code): surfdepo = orm.RemoteData( remote_path = users_address+'surfdepo.t', computer=flexpart_code.computer) - meteo_path = orm.RemoteData( - remote_path=scratch_address+'cosmo7/', - computer = flexpart_code.computer) - meteo_path_ifs = orm.RemoteData( - remote_path = scratch_address+'IFS_GL_05', - computer=flexpart_code.computer) + #parent_folder = orm.load_node(pk previous tsk) parent_folder = orm.RemoteData( - remote_path = scratch_address+'/', + remote_path = '/scratch/snx3000/lfernand/aiida/76/8d/cb2c-2fc6-46c4-b609-1d33fce0f60c', computer=flexpart_code.computer) #builder starts @@ -96,18 +93,33 @@ def test_run(flexpart_code): #basic settings builder.simulation_dates = simulation_dates builder.integration_time = orm.Int(24) - builder.run_cosmo = orm.Bool(False) - builder.offline_integration_time = orm.Int(0) + builder.offline_integration_time = orm.Int(48) #meteo realted settings - builder.model=orm.Str('cosmo7') + builder.model = orm.Str(model) + builder.model_offline = orm.Str(model_offline) + + meteo_path = orm.RemoteData( + remote_path=scratch_address+model+'/', + computer = flexpart_code.computer) builder.meteo_path = meteo_path - builder.meteo_path_ifs = meteo_path_ifs - builder.gribdir=orm.Str(scratch_address) builder.meteo_inputs = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - 'cosmo7', - ])['cosmo7']) + model, + ])[model]) + + if model_offline is not None: + meteo_path_offline = orm.RemoteData( + remote_path = scratch_address+model_offline, + computer=flexpart_code.computer) + builder.meteo_path_offline = meteo_path_offline + builder.meteo_inputs_offline = orm.Dict( + dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ + model_offline, + ])[model_offline]) + + builder.gribdir=orm.Str(scratch_address) + #model settings builder.command = orm.Dict( @@ -141,7 +153,7 @@ def test_run(flexpart_code): 'surfdata': surfdata, 'surfdepo': surfdepo, } - #builder.parent_calc_folder = parent_folder + builder.parent_calc_folder = parent_folder builder.flexpart.metadata.options.stash = { 'source_list': ['aiida.out', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', From 0bc5c6f33caa480daaad9b06f2d7661ceddf5533 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Tue, 7 Nov 2023 10:36:50 +0000 Subject: [PATCH 10/22] minor improvements --- .../workflows/multi_dates_workflow.py | 13 ++++++----- examples/example_workflow_combi.py | 23 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 38dac5e..0d06918 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -66,12 +66,12 @@ def define(cls, spec): help='Dictionary of locations properties.') #meteo related inputs - spec.input('meteo_inputs', valid_type=orm.Dict, + spec.input('meteo_inputs', valid_type=orm.Dict,required=False, help='Meteo models input params.') spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False, help='Meteo models input params.') spec.input('meteo_path', valid_type=orm.RemoteData, - required=True, help='Path to the folder containing the meteorological input data.') + required=False, help='Path to the folder containing the meteorological input data.') spec.input('meteo_path_offline', valid_type=orm.RemoteData, required=False, help='Path to the folder containing the meteorological input data.') spec.input('gribdir', valid_type=orm.Str, required=True) @@ -130,10 +130,10 @@ def run_ifs(self): return True if self.inputs.model in ECMWF_models else False def run_offline(self): - if self.inputs.model_offline in ECMWF_models and self.inputs.model is not None: + if self.inputs.model_offline in ECMWF_models and self.inputs.model != '': self.ctx.index-=1 return True - elif self.inputs.model_offline in ECMWF_models and self.inputs.model is None: + elif self.inputs.model_offline in ECMWF_models and self.inputs.model == '': return True return False @@ -141,6 +141,7 @@ def setup(self): """Prepare a simulation.""" self.report(f'starting setup') + self.report(f'model: {self.inputs.model.value}, model_offline: {self.inputs.model_offline.value}') self.ctx.index = 0 self.ctx.simulation_dates = self.inputs.simulation_dates @@ -261,12 +262,12 @@ def run_ifs_simulation(self): #changes in the command file new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - + if self.inputs.model_offline in ECMWF_models: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 new_dict['dumped_particle_data'] = True - if self.inputs.model is not None: + if self.inputs.model != '': self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder self.report(f'starting from: {self.ctx.parent_calc_folder}') else: diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index b6e0a7e..9496def 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -55,8 +55,8 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-07, 2021-01-08']) - model = 'cosmo7' + simulation_dates = simulation_dates_parser(['2021-01-07,2021-01-08']) + model = None model_offline = 'IFS_GL_05' username='lfernand' users_address=f'/users/{username}/resources/flexpart/' @@ -98,15 +98,16 @@ def test_run(flexpart_code): #meteo realted settings builder.model = orm.Str(model) builder.model_offline = orm.Str(model_offline) - - meteo_path = orm.RemoteData( - remote_path=scratch_address+model+'/', - computer = flexpart_code.computer) - builder.meteo_path = meteo_path - builder.meteo_inputs = orm.Dict( - dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - model, - ])[model]) + + if model is not None: + meteo_path = orm.RemoteData( + remote_path=scratch_address+model+'/', + computer = flexpart_code.computer) + builder.meteo_path = meteo_path + builder.meteo_inputs = orm.Dict( + dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ + model, + ])[model]) if model_offline is not None: meteo_path_offline = orm.RemoteData( From 632cbbfa73e18d800f0e02e08c5d5e37104a2c07 Mon Sep 17 00:00:00 2001 From: ayanu Date: Wed, 8 Nov 2023 16:59:49 +0000 Subject: [PATCH 11/22] Change parser such that FLEPXART stdout is interpreted correctly --- aiida_flexpart/parsers/flexpart_cosmo.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aiida_flexpart/parsers/flexpart_cosmo.py b/aiida_flexpart/parsers/flexpart_cosmo.py index 6310c5b..ad4dfa4 100644 --- a/aiida_flexpart/parsers/flexpart_cosmo.py +++ b/aiida_flexpart/parsers/flexpart_cosmo.py @@ -49,11 +49,12 @@ def parse(self, **kwargs): # check aiida.out content with self.retrieved.open(output_filename, 'r') as handle: - output_node = SinglefileData(file=handle) content=handle.read() + output_node = SinglefileData(file=handle) if 'CONGRATULATIONS' not in content: self.out('output_file', output_node) return ExitCode(1) + # add output file self.logger.info("Parsing '{}'".format(output_filename)) with self.retrieved.open(output_filename, 'rb') as handle: From 7064e8574c6e76cfdd30535a34cc8e1a6826fb83 Mon Sep 17 00:00:00 2001 From: ayanu Date: Wed, 8 Nov 2023 17:02:38 +0000 Subject: [PATCH 12/22] Rearrange some inputs for workflow --- examples/example_workflow_combi.py | 42 +++++++++++++++++------------- examples/inputs/outgrid.yaml | 14 ++++++++++ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 9496def..725b1d9 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -55,10 +55,15 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-07,2021-01-08']) - model = None + simulation_dates = simulation_dates_parser(['2021-07-01']) + model = 'kenda1' model_offline = 'IFS_GL_05' - username='lfernand' + username='shenne' + outgrid_main = 'Europe' + outgrid_nest = 'Switzerland' + integration_time = 24 + integration_time_offline = 48 + users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' @@ -77,23 +82,25 @@ def test_run(flexpart_code): surfdepo = orm.RemoteData( remote_path = users_address+'surfdepo.t', computer=flexpart_code.computer) - #parent_folder = orm.load_node(pk previous tsk) - parent_folder = orm.RemoteData( - remote_path = '/scratch/snx3000/lfernand/aiida/76/8d/cb2c-2fc6-46c4-b609-1d33fce0f60c', - computer=flexpart_code.computer) + + # parent_folder = orm.load_node(pk previous tsk) + # parent_folder = orm.RemoteData( + # remote_path = '/scratch/snx3000/lfernand/aiida/76/8d/cb2c-2fc6-46c4-b609-1d33fce0f60c', + # computer=flexpart_code.computer) + parent_folder = None #builder starts workflow = plugins.WorkflowFactory('flexpart.multi_dates') builder = workflow.get_builder() builder.fcosmo_code = flexpart_code builder.fifs_code = orm.load_code('flexpart_ifs@daint') - builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') - builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') + builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct') + builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct') #basic settings builder.simulation_dates = simulation_dates - builder.integration_time = orm.Int(24) - builder.offline_integration_time = orm.Int(48) + builder.integration_time = orm.Int(integration_time) + builder.offline_integration_time = orm.Int(integration_time_offline) #meteo realted settings builder.model = orm.Str(model) @@ -121,7 +128,6 @@ def test_run(flexpart_code): builder.gribdir=orm.Str(scratch_address) - #model settings builder.command = orm.Dict( dict=read_yaml_data('inputs/command.yaml')) #simulation date will be overwritten @@ -137,12 +143,12 @@ def test_run(flexpart_code): #other builder.outgrid = orm.Dict( dict=read_yaml_data('inputs/outgrid.yaml', names=[ - 'Europe', - ])['Europe']) + outgrid_main, + ])[outgrid_main]) builder.outgrid_nest = orm.Dict(dict=read_yaml_data( - 'inputs/outgrid_nest.yaml', names=[ - 'Europe', - ])['Europe']) + 'inputs/outgrid.yaml', names=[ + outgrid_nest, + ])[outgrid_nest]) builder.species = species builder.land_use = { 'glc': glc, @@ -156,7 +162,7 @@ def test_run(flexpart_code): } builder.parent_calc_folder = parent_folder builder.flexpart.metadata.options.stash = { - 'source_list': ['aiida.out', 'grid_time_*.nc'], + 'source_list': ['aiida.out', 'header*', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } diff --git a/examples/inputs/outgrid.yaml b/examples/inputs/outgrid.yaml index 80a37aa..c254b94 100644 --- a/examples/inputs/outgrid.yaml +++ b/examples/inputs/outgrid.yaml @@ -12,3 +12,17 @@ Europe: - 200.0 - 500.0 - 15000.0 +Switzerland: + output_grid_type: 0 # 1 for coos provided in rotated system 0 for geographical. + longitude_of_output_grid: 4.96 # Longitude of lower left corner of output grid (left boundary of the first grid cell - not its centre). + latitude_of_output_grid: 45.48 # Latitude of lower left corner of output grid (lower boundary of the first grid cell - not its centre). + number_of_grid_points_x: 305 # Number of grid points in x direction (= # of cells + 1). + number_of_grid_points_y: 205 # Number of grid points in y direction (= # of cells + 1). + grid_distance_x: 0.02 # Grid distance in x direction. + grid_distance_y: 0.015 # Grid distance in y direction. + heights_of_levels: # List of heights of leves (upper boundary). + - 50.0 + - 100.0 + - 200.0 + - 500.0 + - 15000.0 From a2b412036663977809184c3aaa9d89d14cf2261e Mon Sep 17 00:00:00 2001 From: ayanu Date: Wed, 8 Nov 2023 17:03:45 +0000 Subject: [PATCH 13/22] Add submission of model_offline to prepare_meteo_folder_ifs --- aiida_flexpart/workflows/multi_dates_workflow.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 0d06918..bd8e855 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -173,10 +173,12 @@ def prepare_meteo_folder_ifs(self): results, node = launch_shell_job( self.inputs.check_meteo_ifs_code, - arguments=' -s {sdate} -e {edate} -a', + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', nodes={ 'sdate': orm.Str(s_date), 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model' : self.inputs.model_offline }) if node.is_finished_ok: From 432efe9f566bb5fa0e9eb6b84f3b38c1a2823ebb Mon Sep 17 00:00:00 2001 From: ayanu Date: Wed, 8 Nov 2023 17:08:53 +0000 Subject: [PATCH 14/22] Delete outgrid_nest.yaml All outgrids now defined in outgrid.yaml --- examples/inputs/outgrid_nest.yaml | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 examples/inputs/outgrid_nest.yaml diff --git a/examples/inputs/outgrid_nest.yaml b/examples/inputs/outgrid_nest.yaml deleted file mode 100644 index 27ea86f..0000000 --- a/examples/inputs/outgrid_nest.yaml +++ /dev/null @@ -1,8 +0,0 @@ -Europe: - output_grid_type: 0 # 1 for coos provided in rotated system 0 for geographical. - longitude_of_output_grid: 4.96 # Longitude of lower left corner of output grid (left boundary of the first grid cell - not its centre). - latitude_of_output_grid: 45.48 # Latitude of lower left corner of output grid (lower boundary of the first grid cell - not its centre). - number_of_grid_points_x: 305 # Number of grid points in x direction (= # of cells + 1). - number_of_grid_points_y: 205 # Number of grid points in y direction (= # of cells + 1). - grid_distance_x: 0.02 # Grid distance in x direction. - grid_distance_y: 0.015 # Grid distance in y direction. From c86986c7e9b8ace72ebfdeadcb4dc5e37851bcfe Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 10 Nov 2023 13:57:43 +0000 Subject: [PATCH 15/22] location groups added --- aiida_flexpart/calculations/flexpart_ifs.py | 6 ++--- aiida_flexpart/parsers/flexpart_cosmo.py | 2 +- examples/example_workflow_combi.py | 28 ++++++++++++++++----- examples/inputs/location_groups.yaml | 2 ++ 4 files changed, 28 insertions(+), 10 deletions(-) create mode 100644 examples/inputs/location_groups.yaml diff --git a/aiida_flexpart/calculations/flexpart_ifs.py b/aiida_flexpart/calculations/flexpart_ifs.py index 106006f..2cb0896 100644 --- a/aiida_flexpart/calculations/flexpart_ifs.py +++ b/aiida_flexpart/calculations/flexpart_ifs.py @@ -143,9 +143,6 @@ def prepare_for_submission(self, folder): # Fill in the AGECLASSES file. fill_in_template_file(folder, 'AGECLASSES', int(age_class_time.total_seconds())) - # Fill in the COMMAND file. - fill_in_template_file(folder, 'COMMAND_ifs', command_dict) - # Fill in the OUTGRID_NEST file if the corresponding dictionary is present. if 'outgrid_nest' in self.inputs: command_dict['nested_output'] = True @@ -153,6 +150,9 @@ def prepare_for_submission(self, folder): else: command_dict['nested_output'] = False + # Fill in the COMMAND file. + fill_in_template_file(folder, 'COMMAND_ifs', command_dict) + # Fill in the OUTGRID file. fill_in_template_file(folder, 'OUTGRID_ifs', self.inputs.outgrid.get_dict()) diff --git a/aiida_flexpart/parsers/flexpart_cosmo.py b/aiida_flexpart/parsers/flexpart_cosmo.py index 6310c5b..e9010d3 100644 --- a/aiida_flexpart/parsers/flexpart_cosmo.py +++ b/aiida_flexpart/parsers/flexpart_cosmo.py @@ -49,8 +49,8 @@ def parse(self, **kwargs): # check aiida.out content with self.retrieved.open(output_filename, 'r') as handle: - output_node = SinglefileData(file=handle) content=handle.read() + output_node = SinglefileData(file=handle) if 'CONGRATULATIONS' not in content: self.out('output_file', output_node) return ExitCode(1) diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 9496def..6d6a05d 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -20,6 +20,13 @@ def read_yaml_data(data_filename: str, names=None) -> dict: for key, value in yaml_data.items() if key in names} if names else yaml_data +def make_locations_list(list_locations): + list_locations_ = read_yaml_data('inputs/location_groups.yaml',names=list_locations) + list_=[] + if list_locations_: + for i,j in list_locations_.items(): + list_+=j + return sorted(set(list_locations+list_)) def simulation_dates_parser(date_list: list) -> list: """ @@ -55,13 +62,16 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-07,2021-01-08']) - model = None + simulation_dates = simulation_dates_parser(['2021-01-09']) + model = 'cosmo7' model_offline = 'IFS_GL_05' username='lfernand' users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' + #list of locations and/or groups of locations + list_locations = ['TEST_32'] + # Links to the remote files/folders. glc = orm.RemoteData(remote_path = users_address+'GLC2000', @@ -127,10 +137,10 @@ def test_run(flexpart_code): dict=read_yaml_data('inputs/command.yaml')) #simulation date will be overwritten builder.input_phy = orm.Dict( dict=read_yaml_data('inputs/input_phy.yaml')) + builder.locations = orm.Dict( - dict=read_yaml_data('inputs/locations.yaml', names=[ - 'TEST_32', - ])) + dict=read_yaml_data('inputs/locations.yaml', + names = make_locations_list(list_locations))) builder.release_settings = orm.Dict( dict=read_yaml_data('inputs/release.yaml')) @@ -155,8 +165,14 @@ def test_run(flexpart_code): 'surfdepo': surfdepo, } builder.parent_calc_folder = parent_folder + builder.flexpart.metadata.options.stash = { - 'source_list': ['aiida.out', 'grid_time_*.nc'], + 'source_list': ['aiida.out','header*','partposit_inst', 'grid_time_*.nc'], + 'target_base': f'/store/empa/em05/{username}/aiida_stash', + 'stash_mode': StashMode.COPY.value, + } + builder.flexpartifs.metadata.options.stash = { + 'source_list': ['aiida.out','header*','partposit_inst', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } diff --git a/examples/inputs/location_groups.yaml b/examples/inputs/location_groups.yaml new file mode 100644 index 0000000..f0b5b76 --- /dev/null +++ b/examples/inputs/location_groups.yaml @@ -0,0 +1,2 @@ +group-name1: + - location1 \ No newline at end of file From 3e7c67b68b546e16bcc70e9be1a637a76bfc22cd Mon Sep 17 00:00:00 2001 From: LucR31 Date: Wed, 15 Nov 2023 14:11:09 +0000 Subject: [PATCH 16/22] multiple models implementation --- aiida_flexpart/calculations/flexpart_cosmo.py | 15 +-- aiida_flexpart/calculations/flexpart_ifs.py | 16 ++- .../workflows/multi_dates_workflow.py | 104 ++++++++++-------- examples/example_workflow_combi.py | 43 ++++---- examples/inputs/location_groups.yaml | 7 +- 5 files changed, 101 insertions(+), 84 deletions(-) diff --git a/aiida_flexpart/calculations/flexpart_cosmo.py b/aiida_flexpart/calculations/flexpart_cosmo.py index 494d192..9cb507a 100644 --- a/aiida_flexpart/calculations/flexpart_cosmo.py +++ b/aiida_flexpart/calculations/flexpart_cosmo.py @@ -51,7 +51,7 @@ def define(cls, spec): help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.' ) spec.input('species', valid_type=orm.RemoteData, required=True) - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=True, help='Path to the folder containing the meteorological input data.') spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO') @@ -101,16 +101,13 @@ def prepare_for_submission(self, folder): # pylint: disable=too-many-locals needed by the calculation. :return: `aiida.common.datastructures.CalcInfo` instance """ + meteo_string_list = ['./','./'] + for path in self.inputs.meteo_path: + meteo_string_list.append(f'{path}{os.sep}') + meteo_string_list.append(f'{path}/AVAILABLE') - meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path()) codeinfo = datastructures.CodeInfo() - codeinfo.cmdline_params = [ - './', # Folder containing the inputs. - './', # Folder containing the outputs. - f'{meteo_path}{os.sep}', - str(meteo_path / 'AVAILABLE'), - # File that lists all the individual input files that are available and assigns them a date - ] + codeinfo.cmdline_params = meteo_string_list codeinfo.code_uuid = self.inputs.code.uuid codeinfo.stdout_name = self.metadata.options.output_filename codeinfo.withmpi = self.inputs.metadata.options.withmpi diff --git a/aiida_flexpart/calculations/flexpart_ifs.py b/aiida_flexpart/calculations/flexpart_ifs.py index 2cb0896..93b89df 100644 --- a/aiida_flexpart/calculations/flexpart_ifs.py +++ b/aiida_flexpart/calculations/flexpart_ifs.py @@ -55,7 +55,7 @@ def define(cls, spec): spec.input('species', valid_type=orm.RemoteData, required=True) spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO') - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=True, help='Path to the folder containing the meteorological input data.') spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) spec.outputs.dynamic = True @@ -98,15 +98,13 @@ def _deal_with_time(cls, command_dict): def prepare_for_submission(self, folder): - meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path()) + meteo_string_list = ['./','./'] + for path in self.inputs.meteo_path: + meteo_string_list.append(f'{path}{os.sep}') + meteo_string_list.append(f'{path}/AVAILABLE') + codeinfo = datastructures.CodeInfo() - codeinfo.cmdline_params = [ - './', # Folder containing the inputs. - './', # Folder containing the outputs. - f'{meteo_path}{os.sep}', - str(meteo_path / 'AVAILABLE'), - # File that lists all the individual input files that are available and assigns them a date - ] + codeinfo.cmdline_params = meteo_string_list codeinfo.code_uuid = self.inputs.code.uuid codeinfo.stdout_name = self.metadata.options.output_filename codeinfo.withmpi = self.inputs.metadata.options.withmpi diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index bd8e855..3d90b30 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -48,8 +48,8 @@ def define(cls, spec): # Basic Inputs spec.input('simulation_dates', valid_type=orm.List, help='A list of the starting dates of the simulations') - spec.input('model', valid_type=orm.Str, required=True) - spec.input('model_offline', valid_type=orm.Str, required=True) + spec.input('model', valid_type=orm.List, required=True) + spec.input('model_offline', valid_type=orm.List, required=True) spec.input('offline_integration_time', valid_type=orm.Int) spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours') spec.input("parent_calc_folder", @@ -70,9 +70,9 @@ def define(cls, spec): help='Meteo models input params.') spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False, help='Meteo models input params.') - spec.input('meteo_path', valid_type=orm.RemoteData, + spec.input('meteo_path', valid_type=orm.List, required=False, help='Path to the folder containing the meteorological input data.') - spec.input('meteo_path_offline', valid_type=orm.RemoteData, + spec.input('meteo_path_offline', valid_type=orm.List, required=False, help='Path to the folder containing the meteorological input data.') spec.input('gribdir', valid_type=orm.Str, required=True) @@ -124,16 +124,22 @@ def condition(self): return True if self.ctx.index < len(self.ctx.simulation_dates) else False def run_cosmo(self): - return True if self.inputs.model in cosmo_models else False + if all(mod in cosmo_models for mod in self.inputs.model) and self.inputs.model: + return True + else: + return False def run_ifs(self): - return True if self.inputs.model in ECMWF_models else False + if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model: + return True + else: + return False def run_offline(self): - if self.inputs.model_offline in ECMWF_models and self.inputs.model != '': + if all(mod in ECMWF_models for mod in self.inputs.model_offline) and self.inputs.model and self.inputs.model_offline: self.ctx.index-=1 return True - elif self.inputs.model_offline in ECMWF_models and self.inputs.model == '': + elif all(mod in ECMWF_models for mod in self.inputs.model_offline) and not self.inputs.model: return True return False @@ -141,7 +147,6 @@ def setup(self): """Prepare a simulation.""" self.report(f'starting setup') - self.report(f'model: {self.inputs.model.value}, model_offline: {self.inputs.model_offline.value}') self.ctx.index = 0 self.ctx.simulation_dates = self.inputs.simulation_dates @@ -169,23 +174,32 @@ def prepare_meteo_folder_ifs(self): self.ctx.command.get_dict()["release_duration"], self.ctx.command.get_dict()["simulation_direction"]) - self.report(f'prepare meteo from {s_date} to {e_date}') - - results, node = launch_shell_job( - self.inputs.check_meteo_ifs_code, - arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', - nodes={ - 'sdate': orm.Str(s_date), - 'edate': orm.Str(e_date), - 'gribdir': self.inputs.gribdir, - 'model' : self.inputs.model_offline - }) - - if node.is_finished_ok: - self.report('meteo files transferred successfully') + self.report(f'preparing meteo from {s_date} to {e_date}') + + if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model: + model_list = self.inputs.model + else: + model_list = self.inputs.model_offline + + node_list=[] + for mod in model_list: + self.report(f'transfering {mod} meteo') + results, node = launch_shell_job( + self.inputs.check_meteo_ifs_code, + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model' : orm.Str(mod) + }) + node_list.append(node) + + if all(node.is_finished_ok for node in node_list): + self.report('ALL meteo OK') return True else: - self.report('failed to transfer meteo files') + self.report('FAILED to transfer meteo') self.ctx.index += 1 return False @@ -195,30 +209,34 @@ def prepare_meteo_folder_cosmo(self): self.ctx.command.get_dict()["release_duration"], self.ctx.command.get_dict()["simulation_direction"]) - self.report(f'prepare meteo from {s_date} to {e_date}') - - results, node = launch_shell_job( - self.inputs.check_meteo_cosmo_code, - arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', - nodes={ - 'sdate': orm.Str(s_date), - 'edate': orm.Str(e_date), - 'gribdir': self.inputs.gribdir, - 'model': self.inputs.model - }) - - if node.is_finished_ok: - self.report('meteo files transferred successfully') + self.report(f'preparing meteo from {s_date} to {e_date}') + + node_list=[] + for mod in self.inputs.model: + self.report(f'transfering {mod} meteo') + results, node = launch_shell_job( + self.inputs.check_meteo_cosmo_code, + arguments=' -s {sdate} -e {edate} -g {gribdir} -m {model} -a', + nodes={ + 'sdate': orm.Str(s_date), + 'edate': orm.Str(e_date), + 'gribdir': self.inputs.gribdir, + 'model': orm.Str(mod) + }) + node_list.append(node) + + if all(node.is_finished_ok for node in node_list): + self.report('ALL meteo OK') return True else: - self.report('failed to transfer meteo files') + self.report('FAILED to transfer meteo') self.ctx.index += 1 return False def run_cosmo_simulation(self): """Run calculations for equation of state.""" - self.report('starting flexpart cosmo') + self.report(f'starting flexpart cosmo {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartCalculation.get_builder() builder.code = self.inputs.fcosmo_code @@ -257,7 +275,7 @@ def run_cosmo_simulation(self): def run_ifs_simulation(self): """Run calculations for equation of state.""" # Set up calculation. - self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}') + self.report(f'running flexpart ifs for {self.ctx.simulation_dates[self.ctx.index]}') builder = FlexpartIfsCalculation.get_builder() builder.code = self.inputs.fifs_code @@ -265,11 +283,11 @@ def run_ifs_simulation(self): new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - if self.inputs.model_offline in ECMWF_models: + if self.inputs.model_offline[0] in ECMWF_models: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 new_dict['dumped_particle_data'] = True - if self.inputs.model != '': + if self.inputs.model: self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder self.report(f'starting from: {self.ctx.parent_calc_folder}') else: diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 66b29db..30d0ac5 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -62,20 +62,20 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2021-01-09']) - model = 'cosmo7' - model_offline = 'IFS_GL_05' + simulation_dates = simulation_dates_parser(['2020-09-01']) + model = ['cosmo7'] + model_offline = [] username='lfernand' outgrid_main = 'Europe' outgrid_nest = 'Switzerland' integration_time = 24 - integration_time_offline = 48 + integration_time_offline = 24 users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' #list of locations and/or groups of locations - list_locations = ['TEST_32'] + list_locations = ['group-name-1'] # Links to the remote files/folders. @@ -104,8 +104,8 @@ def test_run(flexpart_code): builder = workflow.get_builder() builder.fcosmo_code = flexpart_code builder.fifs_code = orm.load_code('flexpart_ifs@daint') - builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct') - builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct') + builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') + builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') #basic settings builder.simulation_dates = simulation_dates @@ -113,28 +113,24 @@ def test_run(flexpart_code): builder.offline_integration_time = orm.Int(integration_time_offline) #meteo realted settings - builder.model = orm.Str(model) - builder.model_offline = orm.Str(model_offline) + builder.model = orm.List(model) + builder.model_offline = orm.List(model_offline) - if model is not None: - meteo_path = orm.RemoteData( - remote_path=scratch_address+model+'/', - computer = flexpart_code.computer) + if model: + meteo_path = orm.List([scratch_address+mod for mod in model]) builder.meteo_path = meteo_path builder.meteo_inputs = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - model, - ])[model]) + model[-1], + ])[model[-1]]) - if model_offline is not None: - meteo_path_offline = orm.RemoteData( - remote_path = scratch_address+model_offline, - computer=flexpart_code.computer) + if model_offline: + meteo_path_offline = orm.List([scratch_address+mod for mod in model_offline]) builder.meteo_path_offline = meteo_path_offline builder.meteo_inputs_offline = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ - model_offline, - ])[model_offline]) + model_offline[-1], + ])[model_offline[-1]]) builder.gribdir=orm.Str(scratch_address) @@ -182,6 +178,11 @@ def test_run(flexpart_code): 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } + + #change wall time for cosom and ifs in seconds + builder.flexpart.metadata.options.max_wallclock_seconds = 1800 + #builder.flexpartifs.metadata.options.max_wallclock_seconds = 2700 + engine.run(builder) diff --git a/examples/inputs/location_groups.yaml b/examples/inputs/location_groups.yaml index f0b5b76..7223075 100644 --- a/examples/inputs/location_groups.yaml +++ b/examples/inputs/location_groups.yaml @@ -1,2 +1,5 @@ -group-name1: - - location1 \ No newline at end of file +group-name-1: + - TEST_32 +group-name-2: + - TEST_32 + - TEST_200 \ No newline at end of file From da97f02bf63512ac4352b731f72e9c1a2e065e73 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 24 Nov 2023 10:31:47 +0000 Subject: [PATCH 17/22] added post processing external code --- .../calculations/post_processing.py | 62 +++++++++++++++++++ aiida_flexpart/parsers/flexpart_post.py | 56 +++++++++++++++++ .../workflows/multi_dates_workflow.py | 26 +++++++- config/code_post_processing.yaml | 24 +++++++ examples/example_workflow_combi.py | 8 ++- setup.json | 6 +- 6 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 aiida_flexpart/calculations/post_processing.py create mode 100644 aiida_flexpart/parsers/flexpart_post.py create mode 100644 config/code_post_processing.yaml diff --git a/aiida_flexpart/calculations/post_processing.py b/aiida_flexpart/calculations/post_processing.py new file mode 100644 index 0000000..713a276 --- /dev/null +++ b/aiida_flexpart/calculations/post_processing.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +Calculations provided by aiida_flexpart. +Register calculations via the "aiida.calculations" entry point in setup.json. +""" +import os +import importlib +import pathlib + +from aiida import orm +from aiida.common import datastructures +from aiida.engine import CalcJob + +class PostProcessingCalculation(CalcJob): + """AiiDA calculation plugin for post processing.""" + @classmethod + def define(cls, spec): + """Define inputs and outputs of the calculation.""" + # yapf: disable + super().define(spec) + + # set default values for AiiDA options + spec.inputs['metadata']['options']['resources'].default = { + 'num_machines': 1, + 'num_mpiprocs_per_machine': 1, + } + spec.input('metadata.options.max_wallclock_seconds', valid_type = int, default=1800) + + #INPUTS + spec.input("input_dir", valid_type = orm.RemoteData, required=True, + help = "main FLEXPART output dir") + spec.input("input_offline_dir", valid_type = orm.RemoteData, required=False, + help = "offline-nested FLEXPART output dir") + spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) + #exit codes + spec.outputs.dynamic = True + spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.') + + def prepare_for_submission(self, folder): + + params = ['-m',self.inputs.input_dir.get_remote_path(), + '-r','./' + ] + if 'input_offline_dir' in self.inputs: + params += ['-n',self.inputs.input_offline_dir.get_remote_path()] + + codeinfo = datastructures.CodeInfo() + codeinfo.cmdline_params = params + codeinfo.code_uuid = self.inputs.code.uuid + codeinfo.stdout_name = self.metadata.options.output_filename + codeinfo.withmpi = self.inputs.metadata.options.withmpi + + # Prepare a `CalcInfo` to be returned to the engine + calcinfo = datastructures.CalcInfo() + calcinfo.codes_info = [codeinfo] + calcinfo.retrieve_list = ['grid_time_*.nc', 'boundary_sensitivity_*.nc', 'aiida.out'] + + return calcinfo + + + + \ No newline at end of file diff --git a/aiida_flexpart/parsers/flexpart_post.py b/aiida_flexpart/parsers/flexpart_post.py new file mode 100644 index 0000000..1f8fb3b --- /dev/null +++ b/aiida_flexpart/parsers/flexpart_post.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +""" +Parsers provided by aiida_flexpart. + +Register parsers via the "aiida.parsers" entry point in setup.json. +""" +from aiida.engine import ExitCode +from aiida.parsers.parser import Parser +from aiida.plugins import CalculationFactory +from aiida.common import exceptions +from aiida.orm import SinglefileData + +FlexpartCalculation = CalculationFactory('flexpart.post') + + +class FlexpartPostParser(Parser): + """ + Parser class for parsing output of calculation. + """ + def __init__(self, node): + """ + Initialize Parser instance + + Checks that the ProcessNode being passed was produced by a FlexpartCalculation. + + :param node: ProcessNode of calculation + :param type node: :class:`aiida.orm.ProcessNode` + """ + super().__init__(node) + if not issubclass(node.process_class, FlexpartCalculation): + raise exceptions.ParsingError('Can only parse FlexpartCalculation') + + def parse(self, **kwargs): + """ + Parse outputs, store results in database. + + :returns: an exit code, if parsing fails (or nothing if parsing succeeds) + """ + output_filename = self.node.get_option('output_filename') + + # Check that folder content is as expected + files_retrieved = self.retrieved.list_object_names() + files_expected = [output_filename] + # Note: set(A) <= set(B) checks whether A is a subset of B + if not set(files_expected) <= set(files_retrieved): + self.logger.error("Found files '{}', expected to find '{}'".format( + files_retrieved, files_expected)) + return self.exit_codes.ERROR_MISSING_OUTPUT_FILES + + # add output file + self.logger.info("Parsing '{}'".format(output_filename)) + with self.retrieved.open(output_filename, 'rb') as handle: + output_node = SinglefileData(file=handle) + self.out('output_file', output_node) + + return ExitCode(0) \ No newline at end of file diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 3d90b30..b54bff2 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -7,6 +7,7 @@ FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo') FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs') +FlexpartPostCalculation = plugins.CalculationFactory('flexpart.post') #possible models cosmo_models = ['cosmo7', 'cosmo1', 'kenda1'] @@ -44,6 +45,7 @@ def define(cls, spec): spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode) spec.input('fifs_code', valid_type=orm.AbstractCode) spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode) + spec.input('post_processing_code', valid_type=orm.AbstractCode) # Basic Inputs spec.input('simulation_dates', valid_type=orm.List, @@ -93,6 +95,9 @@ def define(cls, spec): spec.expose_inputs(FlexpartIfsCalculation, include=['metadata.options'], namespace='flexpartifs') + spec.expose_inputs(FlexpartPostCalculation, + include=['metadata.options'], + namespace='flexpartpost') # Outputs #spec.output('output_file', valid_type=orm.SinglefileData) @@ -115,7 +120,8 @@ def define(cls, spec): if_(cls.prepare_meteo_folder_ifs)( cls.run_ifs_simulation ) - ) + ), + cls.post_processing, ), cls.results, ) @@ -232,6 +238,24 @@ def prepare_meteo_folder_cosmo(self): self.report('FAILED to transfer meteo') self.ctx.index += 1 return False + + def post_processing(self): + + self.report('starting post-processsing') + builder = FlexpartPostCalculation.get_builder() + builder.code = self.inputs.post_processing_code + builder.input_dir = self.ctx.calculations[-1].outputs.remote_folder + + if self.ctx.offline_integration_time>0: + self.report(f'main: {self.ctx.calculations[-2].outputs.remote_folder}') + self.report(f'offline: {self.ctx.calculations[-1].outputs.remote_folder}') + builder.input_dir = self.ctx.calculations[-2].outputs.remote_folder + builder.input_offline_dir = self.ctx.calculations[-1].outputs.remote_folder + + builder.metadata.options = self.inputs.flexpartpost.metadata.options + + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) def run_cosmo_simulation(self): """Run calculations for equation of state.""" diff --git a/config/code_post_processing.yaml b/config/code_post_processing.yaml new file mode 100644 index 0000000..115f1fd --- /dev/null +++ b/config/code_post_processing.yaml @@ -0,0 +1,24 @@ +--- +label: 'post-processing' +description: '' +default_calc_job_plugin: flexpart.post +filepath_executable: '/users/shenne/progs/Rflexpart/exec/FLEXPART.totalFootprint.only.R' +computer: daint +prepend_text: | + #SBATCH --job-name=Rpost + #SBATCH --partition=prepost + #SBATCH --nodes=1 + #SBATCH --ntasks-per-core=2 + #SBATCH --ntasks-per-node=1 + #SBATCH --cpus-per-task=72 + #SBATCH --time=00:30:00 + + module load daint-mc + module switch PrgEnv-cray PrgEnv-gnu + module switch gcc/11.2.0 gcc/9.3.0 + module load cray-netcdf + module use /store/empa/em05/shenne/easybuild/modules/all + module load ecCodes/2.19.0-CrayGNU-21.09 + module load cray-R/3.6.3.1 + export R_LIBS=/users/shenne/R/x86_64-suse-linux-gnu-library/3.6 +append_text: "" \ No newline at end of file diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 30d0ac5..52f06a1 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -64,7 +64,7 @@ def test_run(flexpart_code): simulation_dates = simulation_dates_parser(['2020-09-01']) model = ['cosmo7'] - model_offline = [] + model_offline = ['IFS_GL_05'] username='lfernand' outgrid_main = 'Europe' outgrid_nest = 'Switzerland' @@ -106,6 +106,7 @@ def test_run(flexpart_code): builder.fifs_code = orm.load_code('flexpart_ifs@daint') builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') + builder.post_processing_code = orm.load_code('post-processing@daint') #basic settings builder.simulation_dates = simulation_dates @@ -178,6 +179,11 @@ def test_run(flexpart_code): 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } + builder.flexpartpost.metadata.options.stash = { + 'source_list': ['aiida.out','boundary_sensitivity_*.nc', 'grid_time_*.nc'], + 'target_base': f'/store/empa/em05/{username}/aiida_stash', + 'stash_mode': StashMode.COPY.value, + } #change wall time for cosom and ifs in seconds builder.flexpart.metadata.options.max_wallclock_seconds = 1800 diff --git a/setup.json b/setup.json index cb33cfa..a1b9a58 100644 --- a/setup.json +++ b/setup.json @@ -16,11 +16,13 @@ "entry_points": { "aiida.calculations": [ "flexpart.cosmo = aiida_flexpart.calculations.flexpart_cosmo:FlexpartCosmoCalculation", - "flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation" + "flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation", + "flexpart.post = aiida_flexpart.calculations.post_processing:PostProcessingCalculation" ], "aiida.parsers": [ "flexpart.cosmo = aiida_flexpart.parsers.flexpart_cosmo:FlexpartCosmoParser", - "flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser" + "flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser", + "flexpart.post = aiida_flexpart.parsers.flexpart_post:FlexpartPostParser" ], "aiida.workflows": [ "flexpart.multi_dates = aiida_flexpart.workflows.multi_dates_workflow:FlexpartMultipleDatesWorkflow" From 9e87c39a05e7ad799152b4f1a226f2492e596b42 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Tue, 28 Nov 2023 08:36:48 +0000 Subject: [PATCH 18/22] locations reformat --- .../calculations/post_processing.py | 1 - aiida_flexpart/utils.py | 20 ++++++++++++++++++ examples/example_workflow_combi.py | 21 +++++++++++-------- examples/inputs/location_groups.yaml | 4 ++-- examples/inputs/locations.yaml | 11 ++++++++++ 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/aiida_flexpart/calculations/post_processing.py b/aiida_flexpart/calculations/post_processing.py index 713a276..0b78970 100644 --- a/aiida_flexpart/calculations/post_processing.py +++ b/aiida_flexpart/calculations/post_processing.py @@ -58,5 +58,4 @@ def prepare_for_submission(self, folder): return calcinfo - \ No newline at end of file diff --git a/aiida_flexpart/utils.py b/aiida_flexpart/utils.py index ef04241..8f1bce0 100644 --- a/aiida_flexpart/utils.py +++ b/aiida_flexpart/utils.py @@ -187,3 +187,23 @@ def fill_in_template_file(folder, fname, data): importlib.resources.read_text('aiida_flexpart.templates', fname + '.j2')) infile.write(template.render(data=data)) + +def reformat_locations(dict_, model): + for key in dict_.keys(): + if 'longitude' in dict_[key]: + dict_[key]['longitude_of_lower_left_corner'] = dict_[key]['longitude'] + dict_[key]['longitude_of_upper_right_corner'] = dict_[key]['longitude'] + dict_[key]['latitude_of_lower_left_corner'] = dict_[key]['latitude'] + dict_[key]['latitude_of_upper_right_corner'] = dict_[key]['latitude'] + + if model in dict_[key]['level']: + dict_[key]['lower_z_level'] = dict_[key]['level'][model] + dict_[key]['upper_z_level'] = dict_[key]['level'][model] + else: + dict_[key]['lower_z_level'] = dict_[key]['level']['default'] + dict_[key]['upper_z_level'] = dict_[key]['level']['default'] + + dict_[key].pop('longitude') + dict_[key].pop('latitude') + dict_[key].pop('level') + return dict_ diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 52f06a1..53ad467 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -8,6 +8,7 @@ import yaml from aiida import orm, plugins, engine, cmdline from aiida.common.datastructures import StashMode +from aiida_flexpart.utils import reformat_locations def read_yaml_data(data_filename: str, names=None) -> dict: @@ -62,20 +63,20 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2020-09-01']) + simulation_dates = simulation_dates_parser(['2020-10-01']) model = ['cosmo7'] - model_offline = ['IFS_GL_05'] + model_offline = [] username='lfernand' outgrid_main = 'Europe' outgrid_nest = 'Switzerland' integration_time = 24 - integration_time_offline = 24 + integration_time_offline = 0 users_address=f'/users/{username}/resources/flexpart/' scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' #list of locations and/or groups of locations - list_locations = ['group-name-1'] + list_locations = ['group-name-2'] # Links to the remote files/folders. @@ -141,9 +142,11 @@ def test_run(flexpart_code): builder.input_phy = orm.Dict( dict=read_yaml_data('inputs/input_phy.yaml')) - builder.locations = orm.Dict( - dict=read_yaml_data('inputs/locations.yaml', - names = make_locations_list(list_locations))) + dict_=read_yaml_data('inputs/locations.yaml', + names = make_locations_list(list_locations)) + reformated_dict_locations = reformat_locations(dict_, model[0]) + builder.locations = orm.Dict(dict=reformated_dict_locations) + builder.release_settings = orm.Dict( dict=read_yaml_data('inputs/release.yaml')) @@ -175,7 +178,7 @@ def test_run(flexpart_code): 'stash_mode': StashMode.COPY.value, } builder.flexpartifs.metadata.options.stash = { - 'source_list': ['aiida.out','header*','partposit_inst', 'grid_time_*.nc'], + 'source_list': ['aiida.out','header*','partposit_inst*', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } @@ -185,7 +188,7 @@ def test_run(flexpart_code): 'stash_mode': StashMode.COPY.value, } - #change wall time for cosom and ifs in seconds + #change wall time for cosmo and ifs in seconds builder.flexpart.metadata.options.max_wallclock_seconds = 1800 #builder.flexpartifs.metadata.options.max_wallclock_seconds = 2700 diff --git a/examples/inputs/location_groups.yaml b/examples/inputs/location_groups.yaml index 7223075..7dad9ec 100644 --- a/examples/inputs/location_groups.yaml +++ b/examples/inputs/location_groups.yaml @@ -1,5 +1,5 @@ group-name-1: - - TEST_32 + - TEST_210 group-name-2: - TEST_32 - - TEST_200 \ No newline at end of file + - TEST_210 \ No newline at end of file diff --git a/examples/inputs/locations.yaml b/examples/inputs/locations.yaml index 0d188c8..56c2a4b 100644 --- a/examples/inputs/locations.yaml +++ b/examples/inputs/locations.yaml @@ -6,6 +6,7 @@ TEST_32: lower_z_level: 00032.000 upper_z_level: 00032.000 level_type: 1 # 1 for m above ground, 2 for m above sea level + TEST_200: longitude_of_lower_left_corner: 0007.2480 latitude_of_lower_left_corner: 0047.0536 @@ -14,3 +15,13 @@ TEST_200: lower_z_level: 00200.000 upper_z_level: 00200.000 level_type: 1 # 1 for m above ground, 2 for m above sea level + +TEST_210: + longitude: 7.2480 + latitude: 47.0536 + level: + cosmo7: 200.000 + cosmo1: 100.000 + default: 150.000 + level_type: 1 + From 96f6b272c36b05fb6e8ee29a8873d4026334be06 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Tue, 28 Nov 2023 09:01:28 +0000 Subject: [PATCH 19/22] minor change --- examples/example_workflow_combi.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 53ad467..c1e7619 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -144,7 +144,7 @@ def test_run(flexpart_code): dict_=read_yaml_data('inputs/locations.yaml', names = make_locations_list(list_locations)) - reformated_dict_locations = reformat_locations(dict_, model[0]) + reformated_dict_locations = reformat_locations(dict_, model[-1]) builder.locations = orm.Dict(dict=reformated_dict_locations) builder.release_settings = orm.Dict( From 3dde1e7256cd2175c1c5f7a3c0cb06f4be22443d Mon Sep 17 00:00:00 2001 From: LucR31 Date: Thu, 7 Dec 2023 13:57:28 +0000 Subject: [PATCH 20/22] locations level type --- .../calculations/post_processing.py | 1 - aiida_flexpart/utils.py | 23 ++++++ .../workflows/multi_dates_workflow.py | 67 +++++------------ examples/example_workflow_combi.py | 16 ++--- examples/inputs/locations.yaml | 72 ++++++++++++------- 5 files changed, 95 insertions(+), 84 deletions(-) diff --git a/aiida_flexpart/calculations/post_processing.py b/aiida_flexpart/calculations/post_processing.py index 0b78970..b5cd72d 100644 --- a/aiida_flexpart/calculations/post_processing.py +++ b/aiida_flexpart/calculations/post_processing.py @@ -24,7 +24,6 @@ def define(cls, spec): 'num_machines': 1, 'num_mpiprocs_per_machine': 1, } - spec.input('metadata.options.max_wallclock_seconds', valid_type = int, default=1800) #INPUTS spec.input("input_dir", valid_type = orm.RemoteData, required=True, diff --git a/aiida_flexpart/utils.py b/aiida_flexpart/utils.py index 8f1bce0..113efa2 100644 --- a/aiida_flexpart/utils.py +++ b/aiida_flexpart/utils.py @@ -2,6 +2,7 @@ """Utilties to convert between python and fortran data types and formats.""" import numbers +import datetime import importlib import numpy import jinja2 @@ -203,7 +204,29 @@ def reformat_locations(dict_, model): dict_[key]['lower_z_level'] = dict_[key]['level']['default'] dict_[key]['upper_z_level'] = dict_[key]['level']['default'] + dict_[key]['level_type'] = dict_[key]['level_type'][model] + dict_[key].pop('longitude') dict_[key].pop('latitude') dict_[key].pop('level') return dict_ + +def get_simulation_period(date, + age_class_time, + release_duration, + simulation_direction + ): + """Dealing with simulation times.""" + #initial values + simulation_beginning_date = datetime.datetime.strptime(date,'%Y-%m-%d %H:%M:%S') + age_class_time = datetime.timedelta(seconds=age_class_time) + release_duration = datetime.timedelta(seconds=release_duration+3600) + + if simulation_direction>0: #forward + simulation_ending_date=simulation_beginning_date+release_duration+age_class_time + else: #backward + simulation_ending_date=release_duration+simulation_beginning_date + simulation_beginning_date-=age_class_time + + return datetime.datetime.strftime(simulation_ending_date,'%Y%m%d%H'), datetime.datetime.strftime(simulation_beginning_date,'%Y%m%d%H') + diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index b54bff2..439e103 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -3,9 +3,10 @@ from aiida import engine, plugins, orm from aiida_shell import launch_shell_job from aiida.engine import calcfunction, while_, if_ -import datetime +from aiida_flexpart.utils import get_simulation_period -FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo') +#plugins +FlexpartCosmoCalculation = plugins.CalculationFactory('flexpart.cosmo') FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs') FlexpartPostCalculation = plugins.CalculationFactory('flexpart.post') @@ -13,25 +14,6 @@ cosmo_models = ['cosmo7', 'cosmo1', 'kenda1'] ECMWF_models = ['IFS_GL_05', 'IFS_GL_1', 'IFS_EU_02', 'IFS_EU_01'] -def get_simulation_period(date, - age_class_time, - release_duration, - simulation_direction - ): - """Dealing with simulation times.""" - #initial values - simulation_beginning_date = datetime.datetime.strptime(date,'%Y-%m-%d %H:%M:%S') - age_class_time = datetime.timedelta(seconds=age_class_time) - release_duration = datetime.timedelta(seconds=release_duration+3600) - - if simulation_direction>0: #forward - simulation_ending_date=simulation_beginning_date+release_duration+age_class_time - else: #backward - simulation_ending_date=release_duration+simulation_beginning_date - simulation_beginning_date-=age_class_time - - return datetime.datetime.strftime(simulation_ending_date,'%Y%m%d%H'), datetime.datetime.strftime(simulation_beginning_date,'%Y%m%d%H') - class FlexpartMultipleDatesWorkflow(engine.WorkChain): """Flexpart multi-dates workflow""" @@ -89,9 +71,9 @@ def define(cls, spec): help='#TODO') spec.input_namespace('land_use_ifs', valid_type=orm.RemoteData, required=False, dynamic=True) - spec.expose_inputs(FlexpartCalculation, + spec.expose_inputs(FlexpartCosmoCalculation, include=['metadata.options'], - namespace='flexpart') + namespace='flexpartcosmo') spec.expose_inputs(FlexpartIfsCalculation, include=['metadata.options'], namespace='flexpartifs') @@ -116,11 +98,6 @@ def define(cls, spec): cls.run_ifs_simulation ) ), - if_(cls.run_offline)( - if_(cls.prepare_meteo_folder_ifs)( - cls.run_ifs_simulation - ) - ), cls.post_processing, ), cls.results, @@ -136,18 +113,14 @@ def run_cosmo(self): return False def run_ifs(self): - if all(mod in ECMWF_models for mod in self.inputs.model) and self.inputs.model: + if (all(mod in ECMWF_models for mod in self.inputs.model) or + all(mod in ECMWF_models for mod in self.inputs.model_offline) and + self.inputs.model and + self.inputs.model_offline + ): return True else: return False - - def run_offline(self): - if all(mod in ECMWF_models for mod in self.inputs.model_offline) and self.inputs.model and self.inputs.model_offline: - self.ctx.index-=1 - return True - elif all(mod in ECMWF_models for mod in self.inputs.model_offline) and not self.inputs.model: - return True - return False def setup(self): """Prepare a simulation.""" @@ -262,7 +235,7 @@ def run_cosmo_simulation(self): self.report(f'starting flexpart cosmo {self.ctx.simulation_dates[self.ctx.index]}') - builder = FlexpartCalculation.get_builder() + builder = FlexpartCosmoCalculation.get_builder() builder.code = self.inputs.fcosmo_code #update command file @@ -287,14 +260,14 @@ def run_cosmo_simulation(self): # Walltime, memory, and resources. builder.metadata.description = 'Test workflow to submit a flexpart calculation' - builder.metadata.options = self.inputs.flexpart.metadata.options + builder.metadata.options = self.inputs.flexpartcosmo.metadata.options # Ask the workflow to continue when the results are ready and store them in the context running = self.submit(builder) self.to_context(calculations=engine.append_(running)) - - self.ctx.index += 1 + if self.ctx.offline_integration_time == 0: + self.ctx.index += 1 def run_ifs_simulation(self): """Run calculations for equation of state.""" @@ -307,19 +280,17 @@ def run_ifs_simulation(self): new_dict = self.ctx.command.get_dict() new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index] - if self.inputs.model_offline[0] in ECMWF_models: + if self.ctx.offline_integration_time > 0: new_dict['age_class'] = self.ctx.offline_integration_time * 3600 new_dict['dumped_particle_data'] = True - if self.inputs.model: - self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder - self.report(f'starting from: {self.ctx.parent_calc_folder}') - else: - self.ctx.parent_calc_folder = self.inputs.parent_calc_folder + self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder + builder.parent_calc_folder = self.ctx.parent_calc_folder + self.report(f'starting from: {self.ctx.parent_calc_folder}') builder.meteo_path = self.inputs.meteo_path_offline - builder.parent_calc_folder = self.ctx.parent_calc_folder new_dict.update(self.inputs.meteo_inputs_offline) + else: new_dict['age_class'] = self.inputs.integration_time * 3600 builder.meteo_path = self.inputs.meteo_path diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index c1e7619..2720334 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -63,8 +63,8 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2020-10-01']) - model = ['cosmo7'] + simulation_dates = simulation_dates_parser(['2020-10-01,2020-10-02']) + model = ['cosmo7', 'cosmo1'] model_offline = [] username='lfernand' outgrid_main = 'Europe' @@ -118,10 +118,10 @@ def test_run(flexpart_code): builder.model = orm.List(model) builder.model_offline = orm.List(model_offline) - if model: - meteo_path = orm.List([scratch_address+mod for mod in model]) - builder.meteo_path = meteo_path - builder.meteo_inputs = orm.Dict( + + meteo_path = orm.List([scratch_address+mod for mod in model]) + builder.meteo_path = meteo_path + builder.meteo_inputs = orm.Dict( dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[ model[-1], ])[model[-1]]) @@ -172,7 +172,7 @@ def test_run(flexpart_code): } builder.parent_calc_folder = parent_folder - builder.flexpart.metadata.options.stash = { + builder.flexpartcosmo.metadata.options.stash = { 'source_list': ['aiida.out','header*','partposit_inst', 'grid_time_*.nc'], 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, @@ -189,7 +189,7 @@ def test_run(flexpart_code): } #change wall time for cosmo and ifs in seconds - builder.flexpart.metadata.options.max_wallclock_seconds = 1800 + builder.flexpartcosmo.metadata.options.max_wallclock_seconds = 1800 #builder.flexpartifs.metadata.options.max_wallclock_seconds = 2700 engine.run(builder) diff --git a/examples/inputs/locations.yaml b/examples/inputs/locations.yaml index 56c2a4b..3a46dee 100644 --- a/examples/inputs/locations.yaml +++ b/examples/inputs/locations.yaml @@ -1,27 +1,45 @@ -TEST_32: - longitude_of_lower_left_corner: 0007.2480 - latitude_of_lower_left_corner: 0047.0536 - longitude_of_upper_right_corner: 0007.2480 - latitude_of_upper_right_corner: 0047.0536 - lower_z_level: 00032.000 - upper_z_level: 00032.000 - level_type: 1 # 1 for m above ground, 2 for m above sea level - -TEST_200: - longitude_of_lower_left_corner: 0007.2480 - latitude_of_lower_left_corner: 0047.0536 - longitude_of_upper_right_corner: 0007.2480 - latitude_of_upper_right_corner: 0047.0536 - lower_z_level: 00200.000 - upper_z_level: 00200.000 - level_type: 1 # 1 for m above ground, 2 for m above sea level - -TEST_210: - longitude: 7.2480 - latitude: 47.0536 - level: - cosmo7: 200.000 - cosmo1: 100.000 - default: 150.000 - level_type: 1 - +JFJ_5magl: + longitude: 7.9851 + latitude: 46.5475 + level: + IFS_EU_01: 3062.0 + IFS_EU_02: 2929.0 + cosmo7: 3121.0 + IFS_GL_05: 3085.0 + IFS_GL_1: 2600.0 + level_type: + IFS_EU_01: 2 + IFS_EU_02: 2 + cosmo7: 2 + IFS_GL_05: 2 + IFS_GL_1: 2 +MHD_10magl: + longitude: -9.9046 + latitude: 53.3267 + level: + IFS_EU_01: 10.0 + IFS_EU_02: 10.0 + cosmo7: 10.0 + IFS_GL_05: 10.0 + IFS_GL_1: 10.0 + level_type: + IFS_EU_01: 1 + IFS_EU_02: 1 + cosmo7: 1 + IFS_GL_05: 1 + IFS_GL_1: 1 +KIT_200magl: + longitude: 8.4249 + latitude: 49.0915 + level: + IFS_EU_02: 200.0 + cosmo7: 200.0 + IFS_EU_01: 200.0 + IFS_GL_05: 200.0 + IFS_GL_1: 200.0 + level_type: + IFS_EU_02: 1 + cosmo7: 1 + IFS_EU_01: 1 + IFS_GL_05: 1 + IFS_GL_1: 1 From da90adc09a15600c12797f6478a97e017f0e044b Mon Sep 17 00:00:00 2001 From: LucR31 Date: Wed, 20 Dec 2023 13:44:13 +0000 Subject: [PATCH 21/22] parser name --- aiida_flexpart/calculations/post_processing.py | 1 + examples/example_workflow_combi.py | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/aiida_flexpart/calculations/post_processing.py b/aiida_flexpart/calculations/post_processing.py index b5cd72d..85503ec 100644 --- a/aiida_flexpart/calculations/post_processing.py +++ b/aiida_flexpart/calculations/post_processing.py @@ -26,6 +26,7 @@ def define(cls, spec): } #INPUTS + spec.input('metadata.options.parser_name', valid_type=str, default='flexpart.post') spec.input("input_dir", valid_type = orm.RemoteData, required=True, help = "main FLEXPART output dir") spec.input("input_offline_dir", valid_type = orm.RemoteData, required=False, diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 2720334..990627b 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -63,8 +63,8 @@ def simulation_dates_parser(date_list: list) -> list: def test_run(flexpart_code): """Run workflow.""" - simulation_dates = simulation_dates_parser(['2020-10-01,2020-10-02']) - model = ['cosmo7', 'cosmo1'] + simulation_dates = simulation_dates_parser(['2020-10-01']) + model = ['cosmo7'] model_offline = [] username='lfernand' outgrid_main = 'Europe' @@ -76,7 +76,7 @@ def test_run(flexpart_code): scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/' #list of locations and/or groups of locations - list_locations = ['group-name-2'] + list_locations = ['KIT_200magl'] # Links to the remote files/folders. From a47d2dd38ff358022717b9af9f399d56d2aad111 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Wed, 20 Dec 2023 13:53:50 +0000 Subject: [PATCH 22/22] improve parse --- aiida_flexpart/parsers/flexpart_cosmo.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/aiida_flexpart/parsers/flexpart_cosmo.py b/aiida_flexpart/parsers/flexpart_cosmo.py index ef1f58b..9d2a307 100644 --- a/aiida_flexpart/parsers/flexpart_cosmo.py +++ b/aiida_flexpart/parsers/flexpart_cosmo.py @@ -46,19 +46,14 @@ def parse(self, **kwargs): self.logger.error("Found files '{}', expected to find '{}'".format( files_retrieved, files_expected)) return self.exit_codes.ERROR_MISSING_OUTPUT_FILES - - # check aiida.out content + + # add output file + self.logger.info(f"Parsing '{output_filename}'") with self.retrieved.open(output_filename, 'r') as handle: - content=handle.read() output_node = SinglefileData(file=handle) + self.out('output_file', output_node) + content=handle.read() if 'CONGRATULATIONS' not in content: - self.out('output_file', output_node) return ExitCode(1) - # add output file - self.logger.info("Parsing '{}'".format(output_filename)) - with self.retrieved.open(output_filename, 'rb') as handle: - output_node = SinglefileData(file=handle) - self.out('output_file', output_node) - return ExitCode(0)