Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary PR with all on-going changes. #14

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions aiida_flexpart/calculations/flexpart_cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def define(cls, spec):
help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.'
)
spec.input('species', valid_type=orm.RemoteData, required=True)
spec.input('meteo_path', valid_type=orm.RemoteData,
spec.input('meteo_path', valid_type=orm.List,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO')
Expand Down Expand Up @@ -101,16 +101,13 @@ def prepare_for_submission(self, folder): # pylint: disable=too-many-locals
needed by the calculation.
:return: `aiida.common.datastructures.CalcInfo` instance
"""
meteo_string_list = ['./','./']
for path in self.inputs.meteo_path:
meteo_string_list.append(f'{path}{os.sep}')
meteo_string_list.append(f'{path}/AVAILABLE')

meteo_path = pathlib.Path(self.inputs.meteo_path.get_remote_path())
codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = [
'./', # Folder containing the inputs.
'./', # Folder containing the outputs.
f'{meteo_path}{os.sep}',
str(meteo_path / 'AVAILABLE'),
# File that lists all the individual input files that are available and assigns them a date
]
codeinfo.cmdline_params = meteo_string_list
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi
Expand Down
189 changes: 189 additions & 0 deletions aiida_flexpart/calculations/flexpart_ifs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# -*- coding: utf-8 -*-
"""
Calculations provided by aiida_flexpart.

Register calculations via the "aiida.calculations" entry point in setup.json.
"""
import os
import importlib
import datetime
import pathlib
import jinja2

from aiida import orm
from aiida.common import datastructures
from aiida.engine import CalcJob
from aiida_flexpart.utils import convert_input_to_namelist_entry

from ..utils import fill_in_template_file


class FlexpartIfsCalculation(CalcJob):
"""AiiDA calculation plugin wrapping the FLEXPART IFS executable."""
@classmethod
def define(cls, spec):
"""Define inputs and outputs of the calculation."""
# yapf: disable
super().define(spec)

# set default values for AiiDA options
spec.inputs['metadata']['options']['resources'].default = {
'num_machines': 1,
'num_mpiprocs_per_machine': 1,
}

spec.input('metadata.options.max_wallclock_seconds', valid_type=int, default=1800)
spec.input('metadata.options.parser_name', valid_type=str, default='flexpart.ifs')

spec.input(
"parent_calc_folder",
valid_type=orm.RemoteData,
required=False,
help="Working directory of a previously ran calculation to restart from."
)

# Model settings
spec.input_namespace('model_settings')
spec.input('model_settings.release_settings', valid_type=orm.Dict, required=True)
spec.input('model_settings.locations', valid_type=orm.Dict, required=True)
spec.input('model_settings.command', valid_type=orm.Dict, required=True)

spec.input('outgrid', valid_type=orm.Dict, help='Input file for the Lagrangian particle dispersion model FLEXPART.')
spec.input('outgrid_nest', valid_type=orm.Dict, required=False,
help='Input file for the Lagrangian particle dispersion model FLEXPART. Nested output grid.'
)
spec.input('species', valid_type=orm.RemoteData, required=True)
spec.input_namespace('land_use', valid_type=orm.RemoteData, required=False, dynamic=True, help='#TODO')

spec.input('meteo_path', valid_type=orm.List,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
spec.outputs.dynamic = True

#exit codes
spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.')

@classmethod
def _deal_with_time(cls, command_dict):
"""Dealing with simulation times."""
#initial values
simulation_beginning_date = datetime.datetime.strptime(command_dict.pop('simulation_date'),'%Y-%m-%d %H:%M:%S')
age_class_time = datetime.timedelta(seconds=command_dict.pop('age_class'))
release_chunk = datetime.timedelta(seconds=command_dict.pop('release_chunk'))
release_duration = datetime.timedelta(seconds=command_dict.pop('release_duration'))

#releases start and end times
release_beginning_date=simulation_beginning_date
release_ending_date=release_beginning_date+release_duration

if command_dict['simulation_direction']>0: #forward
simulation_ending_date=release_ending_date+age_class_time
else: #backward
simulation_ending_date=release_ending_date
simulation_beginning_date-=age_class_time

command_dict['simulation_beginning_date'] = [
f'{simulation_beginning_date:%Y%m%d}',
f'{simulation_beginning_date:%H%M%S}'
]
command_dict['simulation_ending_date'] = [
f'{simulation_ending_date:%Y%m%d}',
f'{simulation_ending_date:%H%M%S}'
]
return {
'beginning_date': release_beginning_date,
'ending_date': release_ending_date,
'chunk': release_chunk
} , age_class_time

def prepare_for_submission(self, folder):

meteo_string_list = ['./','./']
for path in self.inputs.meteo_path:
meteo_string_list.append(f'{path}{os.sep}')
meteo_string_list.append(f'{path}/AVAILABLE')

codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = meteo_string_list
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi

# Prepare a `CalcInfo` to be returned to the engine
calcinfo = datastructures.CalcInfo()
calcinfo.codes_info = [codeinfo]


command_dict = self.inputs.model_settings.command.get_dict()

# Deal with simulation times.
release, age_class_time = self._deal_with_time(command_dict)

# Fill in the releases file.
with folder.open('RELEASES', 'w') as infile:
time_chunks = []
current_time = release['beginning_date'] + release['chunk']
while current_time <= release['ending_date']:
time_chunks.append({
'begin': [f'{current_time-release["chunk"]:%Y%m%d}', f'{current_time-release["chunk"]:%H%M%S}'],
'end': [f'{current_time:%Y%m%d}', f'{current_time:%H%M%S}'],
})
current_time += release['chunk']

template = jinja2.Template(importlib.resources.read_text('aiida_flexpart.templates', 'RELEASES.j2'))
infile.write(template.render(
time_chunks=time_chunks,
locations=self.inputs.model_settings.locations.get_dict(),
release_settings=self.inputs.model_settings.release_settings.get_dict()
)
)

# Fill in the AGECLASSES file.
fill_in_template_file(folder, 'AGECLASSES', int(age_class_time.total_seconds()))

# Fill in the OUTGRID_NEST file if the corresponding dictionary is present.
if 'outgrid_nest' in self.inputs:
command_dict['nested_output'] = True
fill_in_template_file(folder, 'OUTGRID_NEST_ifs', self.inputs.outgrid_nest.get_dict())
else:
command_dict['nested_output'] = False

# Fill in the COMMAND file.
fill_in_template_file(folder, 'COMMAND_ifs', command_dict)

# Fill in the OUTGRID file.
fill_in_template_file(folder, 'OUTGRID_ifs', self.inputs.outgrid.get_dict())


calcinfo.remote_symlink_list = []
calcinfo.remote_symlink_list.append((
self.inputs.species.computer.uuid,
self.inputs.species.get_remote_path(),
'SPECIES'
))

if "parent_calc_folder" in self.inputs:
computer_uuid = self.inputs.parent_calc_folder.computer.uuid
remote_path = self.inputs.parent_calc_folder.get_remote_path()
calcinfo.remote_symlink_list.append((
computer_uuid,
remote_path+'/header',
'header_previous'))
calcinfo.remote_symlink_list.append((
computer_uuid,
remote_path+'/partposit_inst',
'partposit_previous'))


# Dealing with land_use input namespace.
for _, value in self.inputs.land_use.items():
file_path = value.get_remote_path()
calcinfo.remote_symlink_list.append((value.computer.uuid, file_path, pathlib.Path(file_path).name))

calcinfo.retrieve_list = ['grid_time_*.nc', 'aiida.out']

return calcinfo




61 changes: 61 additions & 0 deletions aiida_flexpart/calculations/post_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
"""
Calculations provided by aiida_flexpart.
Register calculations via the "aiida.calculations" entry point in setup.json.
"""
import os
import importlib
import pathlib

from aiida import orm
from aiida.common import datastructures
from aiida.engine import CalcJob

class PostProcessingCalculation(CalcJob):
"""AiiDA calculation plugin for post processing."""
@classmethod
def define(cls, spec):
"""Define inputs and outputs of the calculation."""
# yapf: disable
super().define(spec)

# set default values for AiiDA options
spec.inputs['metadata']['options']['resources'].default = {
'num_machines': 1,
'num_mpiprocs_per_machine': 1,
}

#INPUTS
spec.input('metadata.options.parser_name', valid_type=str, default='flexpart.post')
spec.input("input_dir", valid_type = orm.RemoteData, required=True,
help = "main FLEXPART output dir")
spec.input("input_offline_dir", valid_type = orm.RemoteData, required=False,
help = "offline-nested FLEXPART output dir")
spec.input('metadata.options.output_filename', valid_type=str, default='aiida.out', required=True)
#exit codes
spec.outputs.dynamic = True
spec.exit_code(300, 'ERROR_MISSING_OUTPUT_FILES', message='Calculation did not produce all expected output files.')

def prepare_for_submission(self, folder):

params = ['-m',self.inputs.input_dir.get_remote_path(),
'-r','./'
]
if 'input_offline_dir' in self.inputs:
params += ['-n',self.inputs.input_offline_dir.get_remote_path()]

codeinfo = datastructures.CodeInfo()
codeinfo.cmdline_params = params
codeinfo.code_uuid = self.inputs.code.uuid
codeinfo.stdout_name = self.metadata.options.output_filename
codeinfo.withmpi = self.inputs.metadata.options.withmpi

# Prepare a `CalcInfo` to be returned to the engine
calcinfo = datastructures.CalcInfo()
calcinfo.codes_info = [codeinfo]
calcinfo.retrieve_list = ['grid_time_*.nc', 'boundary_sensitivity_*.nc', 'aiida.out']

return calcinfo



14 changes: 6 additions & 8 deletions aiida_flexpart/parsers/flexpart_cosmo.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,14 @@ def parse(self, **kwargs):
self.logger.error("Found files '{}', expected to find '{}'".format(
files_retrieved, files_expected))
return self.exit_codes.ERROR_MISSING_OUTPUT_FILES

# check aiida.out content

# add output file
self.logger.info(f"Parsing '{output_filename}'")
with self.retrieved.open(output_filename, 'r') as handle:
output_node = SinglefileData(file=handle)
self.out('output_file', output_node)
content=handle.read()
if 'CONGRATULATIONS' not in content:
return ExitCode(1)
# add output file
self.logger.info("Parsing '{}'".format(output_filename))
with self.retrieved.open(output_filename, 'rb') as handle:
output_node = SinglefileData(file=handle)
self.out('output_file', output_node)


return ExitCode(0)
63 changes: 63 additions & 0 deletions aiida_flexpart/parsers/flexpart_ifs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
"""
Parsers provided by aiida_flexpart.

Register parsers via the "aiida.parsers" entry point in setup.json.
"""
from aiida.engine import ExitCode
from aiida.parsers.parser import Parser
from aiida.plugins import CalculationFactory
from aiida.common import exceptions
from aiida.orm import SinglefileData

FlexpartCalculation = CalculationFactory('flexpart.ifs')


class FlexpartIfsParser(Parser):
"""
Parser class for parsing output of calculation.
"""
def __init__(self, node):
"""
Initialize Parser instance

Checks that the ProcessNode being passed was produced by a FlexpartCalculation.

:param node: ProcessNode of calculation
:param type node: :class:`aiida.orm.ProcessNode`
"""
super().__init__(node)
if not issubclass(node.process_class, FlexpartCalculation):
raise exceptions.ParsingError('Can only parse FlexpartCalculation')

def parse(self, **kwargs):
"""
Parse outputs, store results in database.

:returns: an exit code, if parsing fails (or nothing if parsing succeeds)
"""
output_filename = self.node.get_option('output_filename')

# Check that folder content is as expected
files_retrieved = self.retrieved.list_object_names()
files_expected = [output_filename]
# Note: set(A) <= set(B) checks whether A is a subset of B
if not set(files_expected) <= set(files_retrieved):
self.logger.error("Found files '{}', expected to find '{}'".format(
files_retrieved, files_expected))
return self.exit_codes.ERROR_MISSING_OUTPUT_FILES

# check aiida.out content
with self.retrieved.open(output_filename, 'r') as handle:
content = handle.read()
output_node = SinglefileData(file=handle)
if 'CONGRATULATIONS' not in content:
self.out('output_file', output_node)
return ExitCode(1)
# add output file
self.logger.info("Parsing '{}'".format(output_filename))
with self.retrieved.open(output_filename, 'rb') as handle:
output_node = SinglefileData(file=handle)
self.out('output_file', output_node)

return ExitCode(0)
Loading
Loading