Skip to content

Commit

Permalink
added post processing external code
Browse files Browse the repository at this point in the history
  • Loading branch information
LucR31 committed Nov 24, 2023
1 parent 3e7c67b commit da97f02
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 4 deletions.
62 changes: 62 additions & 0 deletions aiida_flexpart/calculations/post_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# -*- coding: utf-8 -*-
"""
Calculations provided by aiida_flexpart.
Register calculations via the "aiida.calculations" entry point in setup.json.
"""
import os
import importlib
import pathlib

from aiida import orm
from aiida.common import datastructures
from aiida.engine import CalcJob

class PostProcessingCalculation(CalcJob):
"""AiiDA calculation plugin for post processing."""
@classmethod
def define(cls, spec):
"""Define inputs and outputs of the calculation."""
# yapf: disable
super().define(spec)

# set default values for AiiDA options
spec.inputs['metadata']['options']['resources'].default = {
'num_machines': 1,
'num_mpiprocs_per_machine': 1,
}
spec.input('metadata.options.max_wallclock_seconds', valid_type = int, default=1800)

#INPUTS
spec.input("input_dir", valid_type = orm.RemoteData, required=True,
help = "main FLEXPART output dir")
spec.input("input_offline_dir", valid_type = orm.RemoteData, required=False,
help = "offline-nested FLEXPART output dir")
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
#exit codes
spec.outputs.dynamic = True
spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.')

def prepare_for_submission(self, folder):

params = ['-m',self.inputs.input_dir.get_remote_path(),
'-r','./'
]
if 'input_offline_dir' in self.inputs:
params += ['-n',self.inputs.input_offline_dir.get_remote_path()]

codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = params
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi

# Prepare a `CalcInfo` to be returned to the engine
calcinfo = datastructures.CalcInfo()
calcinfo.codes_info = [codeinfo]
calcinfo.retrieve_list = ['grid_time_*.nc', 'boundary_sensitivity_*.nc', 'aiida.out']

return calcinfo




56 changes: 56 additions & 0 deletions aiida_flexpart/parsers/flexpart_post.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
"""
Parsers provided by aiida_flexpart.
Register parsers via the "aiida.parsers" entry point in setup.json.
"""
from aiida.engine import ExitCode
from aiida.parsers.parser import Parser
from aiida.plugins import CalculationFactory
from aiida.common import exceptions
from aiida.orm import SinglefileData

FlexpartCalculation = CalculationFactory('flexpart.post')


class FlexpartPostParser(Parser):
"""
Parser class for parsing output of calculation.
"""
def __init__(self, node):
"""
Initialize Parser instance
Checks that the ProcessNode being passed was produced by a FlexpartCalculation.
:param node: ProcessNode of calculation
:param type node: :class:`aiida.orm.ProcessNode`
"""
super().__init__(node)
if not issubclass(node.process_class, FlexpartCalculation):
raise exceptions.ParsingError('Can only parse FlexpartCalculation')

def parse(self, **kwargs):
"""
Parse outputs, store results in database.
:returns: an exit code, if parsing fails (or nothing if parsing succeeds)
"""
output_filename = self.node.get_option('output_filename')

# Check that folder content is as expected
files_retrieved = self.retrieved.list_object_names()
files_expected = [output_filename]
# Note: set(A) <= set(B) checks whether A is a subset of B
if not set(files_expected) <= set(files_retrieved):
self.logger.error("Found files '{}', expected to find '{}'".format(
files_retrieved, files_expected))
return self.exit_codes.ERROR_MISSING_OUTPUT_FILES

# add output file
self.logger.info("Parsing '{}'".format(output_filename))
with self.retrieved.open(output_filename, 'rb') as handle:
output_node = SinglefileData(file=handle)
self.out('output_file', output_node)

return ExitCode(0)
26 changes: 25 additions & 1 deletion aiida_flexpart/workflows/multi_dates_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo')
FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs')
FlexpartPostCalculation = plugins.CalculationFactory('flexpart.post')

#possible models
cosmo_models = ['cosmo7', 'cosmo1', 'kenda1']
Expand Down Expand Up @@ -44,6 +45,7 @@ def define(cls, spec):
spec.input('check_meteo_cosmo_code', valid_type=orm.AbstractCode)
spec.input('fifs_code', valid_type=orm.AbstractCode)
spec.input('check_meteo_ifs_code', valid_type=orm.AbstractCode)
spec.input('post_processing_code', valid_type=orm.AbstractCode)

# Basic Inputs
spec.input('simulation_dates', valid_type=orm.List,
Expand Down Expand Up @@ -93,6 +95,9 @@ def define(cls, spec):
spec.expose_inputs(FlexpartIfsCalculation,
include=['metadata.options'],
namespace='flexpartifs')
spec.expose_inputs(FlexpartPostCalculation,
include=['metadata.options'],
namespace='flexpartpost')

# Outputs
#spec.output('output_file', valid_type=orm.SinglefileData)
Expand All @@ -115,7 +120,8 @@ def define(cls, spec):
if_(cls.prepare_meteo_folder_ifs)(
cls.run_ifs_simulation
)
)
),
cls.post_processing,
),
cls.results,
)
Expand Down Expand Up @@ -232,6 +238,24 @@ def prepare_meteo_folder_cosmo(self):
self.report('FAILED to transfer meteo')
self.ctx.index += 1
return False

def post_processing(self):

self.report('starting post-processsing')
builder = FlexpartPostCalculation.get_builder()
builder.code = self.inputs.post_processing_code
builder.input_dir = self.ctx.calculations[-1].outputs.remote_folder

if self.ctx.offline_integration_time>0:
self.report(f'main: {self.ctx.calculations[-2].outputs.remote_folder}')
self.report(f'offline: {self.ctx.calculations[-1].outputs.remote_folder}')
builder.input_dir = self.ctx.calculations[-2].outputs.remote_folder
builder.input_offline_dir = self.ctx.calculations[-1].outputs.remote_folder

builder.metadata.options = self.inputs.flexpartpost.metadata.options

running = self.submit(builder)
self.to_context(calculations=engine.append_(running))

def run_cosmo_simulation(self):
"""Run calculations for equation of state."""
Expand Down
24 changes: 24 additions & 0 deletions config/code_post_processing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
label: 'post-processing'
description: ''
default_calc_job_plugin: flexpart.post
filepath_executable: '/users/shenne/progs/Rflexpart/exec/FLEXPART.totalFootprint.only.R'
computer: daint
prepend_text: |
#SBATCH --job-name=Rpost
#SBATCH --partition=prepost
#SBATCH --nodes=1
#SBATCH --ntasks-per-core=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=72
#SBATCH --time=00:30:00
module load daint-mc
module switch PrgEnv-cray PrgEnv-gnu
module switch gcc/11.2.0 gcc/9.3.0
module load cray-netcdf
module use /store/empa/em05/shenne/easybuild/modules/all
module load ecCodes/2.19.0-CrayGNU-21.09
module load cray-R/3.6.3.1
export R_LIBS=/users/shenne/R/x86_64-suse-linux-gnu-library/3.6
append_text: ""
8 changes: 7 additions & 1 deletion examples/example_workflow_combi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_run(flexpart_code):

simulation_dates = simulation_dates_parser(['2020-09-01'])
model = ['cosmo7']
model_offline = []
model_offline = ['IFS_GL_05']
username='lfernand'
outgrid_main = 'Europe'
outgrid_nest = 'Switzerland'
Expand Down Expand Up @@ -106,6 +106,7 @@ def test_run(flexpart_code):
builder.fifs_code = orm.load_code('flexpart_ifs@daint')
builder.check_meteo_ifs_code = orm.load_code('check-ifs-data@daint-direct-106')
builder.check_meteo_cosmo_code = orm.load_code('check-cosmo-data@daint-direct-106')
builder.post_processing_code = orm.load_code('post-processing@daint')

#basic settings
builder.simulation_dates = simulation_dates
Expand Down Expand Up @@ -178,6 +179,11 @@ def test_run(flexpart_code):
'target_base': f'/store/empa/em05/{username}/aiida_stash',
'stash_mode': StashMode.COPY.value,
}
builder.flexpartpost.metadata.options.stash = {
'source_list': ['aiida.out','boundary_sensitivity_*.nc', 'grid_time_*.nc'],
'target_base': f'/store/empa/em05/{username}/aiida_stash',
'stash_mode': StashMode.COPY.value,
}

#change wall time for cosom and ifs in seconds
builder.flexpart.metadata.options.max_wallclock_seconds = 1800
Expand Down
6 changes: 4 additions & 2 deletions setup.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
"entry_points": {
"aiida.calculations": [
"flexpart.cosmo = aiida_flexpart.calculations.flexpart_cosmo:FlexpartCosmoCalculation",
"flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation"
"flexpart.ifs = aiida_flexpart.calculations.flexpart_ifs:FlexpartIfsCalculation",
"flexpart.post = aiida_flexpart.calculations.post_processing:PostProcessingCalculation"
],
"aiida.parsers": [
"flexpart.cosmo = aiida_flexpart.parsers.flexpart_cosmo:FlexpartCosmoParser",
"flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser"
"flexpart.ifs = aiida_flexpart.parsers.flexpart_ifs:FlexpartIfsParser",
"flexpart.post = aiida_flexpart.parsers.flexpart_post:FlexpartPostParser"
],
"aiida.workflows": [
"flexpart.multi_dates = aiida_flexpart.workflows.multi_dates_workflow:FlexpartMultipleDatesWorkflow"
Expand Down

0 comments on commit da97f02

Please sign in to comment.