From da97f02bf63512ac4352b731f72e9c1a2e065e73 Mon Sep 17 00:00:00 2001 From: LucR31 Date: Fri, 24 Nov 2023 10:31:47 +0000 Subject: [PATCH] added post processing external code --- .../calculations/post_processing.py | 62 +++++++++++++++++++ aiida_flexpart/parsers/flexpart_post.py | 56 +++++++++++++++++ .../workflows/multi_dates_workflow.py | 26 +++++++- config/code_post_processing.yaml | 24 +++++++ examples/example_workflow_combi.py | 8 ++- setup.json | 6 +- 6 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 aiida_flexpart/calculations/post_processing.py create mode 100644 aiida_flexpart/parsers/flexpart_post.py create mode 100644 config/code_post_processing.yaml diff --git a/aiida_flexpart/calculations/post_processing.py b/aiida_flexpart/calculations/post_processing.py new file mode 100644 index 0000000..713a276 --- /dev/null +++ b/aiida_flexpart/calculations/post_processing.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +""" +Calculations provided by aiida_flexpart. +Register calculations via the "aiida.calculations" entry point in setup.json. +""" +import os +import importlib +import pathlib + +from aiida import orm +from aiida.common import datastructures +from aiida.engine import CalcJob + +class PostProcessingCalculation(CalcJob): + """AiiDA calculation plugin for post processing.""" + @classmethod + def define(cls, spec): + """Define inputs and outputs of the calculation.""" + # yapf: disable + super().define(spec) + + # set default values for AiiDA options + spec.inputs['metadata']['options']['resources'].default = { + 'num_machines': 1, + 'num_mpiprocs_per_machine': 1, + } + spec.input('metadata.options.max_wallclock_seconds', valid_type = int, default=1800) + + #INPUTS + spec.input("input_dir", valid_type = orm.RemoteData, required=True, + help = "main FLEXPART output dir") + spec.input("input_offline_dir", valid_type = orm.RemoteData, required=False, + help = "offline-nested FLEXPART output dir") + spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True) + #exit codes + spec.outputs.dynamic = True + spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.') + + def prepare_for_submission(self, folder): + + params = ['-m',self.inputs.input_dir.get_remote_path(), + '-r','./' + ] + if 'input_offline_dir' in self.inputs: + params += ['-n',self.inputs.input_offline_dir.get_remote_path()] + + codeinfo = datastructures.CodeInfo() + codeinfo.cmdline_params = params + codeinfo.code_uuid = self.inputs.code.uuid + codeinfo.stdout_name = self.metadata.options.output_filename + codeinfo.withmpi = self.inputs.metadata.options.withmpi + + # Prepare a `CalcInfo` to be returned to the engine + calcinfo = datastructures.CalcInfo() + calcinfo.codes_info = [codeinfo] + calcinfo.retrieve_list = ['grid_time_*.nc', 'boundary_sensitivity_*.nc', 'aiida.out'] + + return calcinfo + + + + \ No newline at end of file diff --git a/aiida_flexpart/parsers/flexpart_post.py b/aiida_flexpart/parsers/flexpart_post.py new file mode 100644 index 0000000..1f8fb3b --- /dev/null +++ b/aiida_flexpart/parsers/flexpart_post.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +""" +Parsers provided by aiida_flexpart. + +Register parsers via the "aiida.parsers" entry point in setup.json. +""" +from aiida.engine import ExitCode +from aiida.parsers.parser import Parser +from aiida.plugins import CalculationFactory +from aiida.common import exceptions +from aiida.orm import SinglefileData + +FlexpartCalculation = CalculationFactory('flexpart.post') + + +class FlexpartPostParser(Parser): + """ + Parser class for parsing output of calculation. + """ + def __init__(self, node): + """ + Initialize Parser instance + + Checks that the ProcessNode being passed was produced by a FlexpartCalculation. + + :param node: ProcessNode of calculation + :param type node: :class:`aiida.orm.ProcessNode` + """ + super().__init__(node) + if not issubclass(node.process_class, FlexpartCalculation): + raise exceptions.ParsingError('Can only parse FlexpartCalculation') + + def parse(self, **kwargs): + """ + Parse outputs, store results in database. + + :returns: an exit code, if parsing fails (or nothing if parsing succeeds) + """ + output_filename = self.node.get_option('output_filename') + + # Check that folder content is as expected + files_retrieved = self.retrieved.list_object_names() + files_expected = [output_filename] + # Note: set(A) <= set(B) checks whether A is a subset of B + if not set(files_expected) <= set(files_retrieved): + self.logger.error("Found files '{}', expected to find '{}'".format( + files_retrieved, files_expected)) + return self.exit_codes.ERROR_MISSING_OUTPUT_FILES + + # add output file + self.logger.info("Parsing '{}'".format(output_filename)) + with self.retrieved.open(output_filename, 'rb') as handle: + output_node = SinglefileData(file=handle) + self.out('output_file', output_node) + + return ExitCode(0) \ No newline at end of file diff --git a/aiida_flexpart/workflows/multi_dates_workflow.py b/aiida_flexpart/workflows/multi_dates_workflow.py index 3d90b30..b54bff2 100644 --- a/aiida_flexpart/workflows/multi_dates_workflow.py +++ b/aiida_flexpart/workflows/multi_dates_workflow.py @@ -7,6 +7,7 @@ FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo') FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs') +FlexpartPostCalculation = plugins.CalculationFactory('flexpart.post') #possible models cosmo_models = ['cosmo7', 'cosmo1', 'kenda1'] @@ -44,6 +45,7 @@ def define(cls, spec): spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode) spec.input('fifs_code', valid_type=orm.AbstractCode) spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode) + spec.input('post_processing_code', valid_type=orm.AbstractCode) # Basic Inputs spec.input('simulation_dates', valid_type=orm.List, @@ -93,6 +95,9 @@ def define(cls, spec): spec.expose_inputs(FlexpartIfsCalculation, include=['metadata.options'], namespace='flexpartifs') + spec.expose_inputs(FlexpartPostCalculation, + include=['metadata.options'], + namespace='flexpartpost') # Outputs #spec.output('output_file', valid_type=orm.SinglefileData) @@ -115,7 +120,8 @@ def define(cls, spec): if_(cls.prepare_meteo_folder_ifs)( cls.run_ifs_simulation ) - ) + ), + cls.post_processing, ), cls.results, ) @@ -232,6 +238,24 @@ def prepare_meteo_folder_cosmo(self): self.report('FAILED to transfer meteo') self.ctx.index += 1 return False + + def post_processing(self): + + self.report('starting post-processsing') + builder = FlexpartPostCalculation.get_builder() + builder.code = self.inputs.post_processing_code + builder.input_dir = self.ctx.calculations[-1].outputs.remote_folder + + if self.ctx.offline_integration_time>0: + self.report(f'main: {self.ctx.calculations[-2].outputs.remote_folder}') + self.report(f'offline: {self.ctx.calculations[-1].outputs.remote_folder}') + builder.input_dir = self.ctx.calculations[-2].outputs.remote_folder + builder.input_offline_dir = self.ctx.calculations[-1].outputs.remote_folder + + builder.metadata.options = self.inputs.flexpartpost.metadata.options + + running = self.submit(builder) + self.to_context(calculations=engine.append_(running)) def run_cosmo_simulation(self): """Run calculations for equation of state.""" diff --git a/config/code_post_processing.yaml b/config/code_post_processing.yaml new file mode 100644 index 0000000..115f1fd --- /dev/null +++ b/config/code_post_processing.yaml @@ -0,0 +1,24 @@ +--- +label: 'post-processing' +description: '' +default_calc_job_plugin: flexpart.post +filepath_executable: '/users/shenne/progs/Rflexpart/exec/FLEXPART.totalFootprint.only.R' +computer: daint +prepend_text: | + #SBATCH --job-name=Rpost + #SBATCH --partition=prepost + #SBATCH --nodes=1 + #SBATCH --ntasks-per-core=2 + #SBATCH --ntasks-per-node=1 + #SBATCH --cpus-per-task=72 + #SBATCH --time=00:30:00 + + module load daint-mc + module switch PrgEnv-cray PrgEnv-gnu + module switch gcc/11.2.0 gcc/9.3.0 + module load cray-netcdf + module use /store/empa/em05/shenne/easybuild/modules/all + module load ecCodes/2.19.0-CrayGNU-21.09 + module load cray-R/3.6.3.1 + export R_LIBS=/users/shenne/R/x86_64-suse-linux-gnu-library/3.6 +append_text: "" \ No newline at end of file diff --git a/examples/example_workflow_combi.py b/examples/example_workflow_combi.py index 30d0ac5..52f06a1 100644 --- a/examples/example_workflow_combi.py +++ b/examples/example_workflow_combi.py @@ -64,7 +64,7 @@ def test_run(flexpart_code): simulation_dates = simulation_dates_parser(['2020-09-01']) model = ['cosmo7'] - model_offline = [] + model_offline = ['IFS_GL_05'] username='lfernand' outgrid_main = 'Europe' outgrid_nest = 'Switzerland' @@ -106,6 +106,7 @@ def test_run(flexpart_code): builder.fifs_code = orm.load_code('flexpart_ifs@daint') builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106') builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106') + builder.post_processing_code = orm.load_code('post-processing@daint') #basic settings builder.simulation_dates = simulation_dates @@ -178,6 +179,11 @@ def test_run(flexpart_code): 'target_base': f'/store/empa/em05/{username}/aiida_stash', 'stash_mode': StashMode.COPY.value, } + builder.flexpartpost.metadata.options.stash = { + 'source_list': ['aiida.out','boundary_sensitivity_*.nc', 'grid_time_*.nc'], + 'target_base': f'/store/empa/em05/{username}/aiida_stash', + 'stash_mode': StashMode.COPY.value, + } #change wall time for cosom and ifs in seconds builder.flexpart.metadata.options.max_wallclock_seconds = 1800 diff --git a/setup.json b/setup.json index cb33cfa..a1b9a58 100644 --- a/setup.json +++ b/setup.json @@ -16,11 +16,13 @@ "entry_points": { "aiida.calculations": [ "flexpart.cosmo = aiida_flexpart.calculations.flexpart_cosmo:FlexpartCosmoCalculation", - "flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation" + "flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation", + "flexpart.post = aiida_flexpart.calculations.post_processing:PostProcessingCalculation" ], "aiida.parsers": [ "flexpart.cosmo = aiida_flexpart.parsers.flexpart_cosmo:FlexpartCosmoParser", - "flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser" + "flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser", + "flexpart.post = aiida_flexpart.parsers.flexpart_post:FlexpartPostParser" ], "aiida.workflows": [ "flexpart.multi_dates = aiida_flexpart.workflows.multi_dates_workflow:FlexpartMultipleDatesWorkflow"