diff --git a/src/nomad_simulations/schema_packages/utils/__init__.py b/src/nomad_simulations/schema_packages/utils/__init__.py index 52d9ca22..a602cf97 100644 --- a/src/nomad_simulations/schema_packages/utils/__init__.py +++ b/src/nomad_simulations/schema_packages/utils/__init__.py @@ -1,5 +1,6 @@ from .utils import ( RussellSaundersState, + extract_all_simulation_subsections, get_composition, get_sibling_section, get_variables, diff --git a/src/nomad_simulations/schema_packages/utils/utils.py b/src/nomad_simulations/schema_packages/utils/utils.py index 1d40aa4a..e61bc1eb 100644 --- a/src/nomad_simulations/schema_packages/utils/utils.py +++ b/src/nomad_simulations/schema_packages/utils/utils.py @@ -8,8 +8,13 @@ from typing import Optional from nomad.datamodel.data import ArchiveSection + from nomad.datamodel.datamodel import EntryArchive from structlog.stdlib import BoundLogger + from nomad_simulations.schema_packages.model_method import ModelMethod + from nomad_simulations.schema_packages.model_system import ModelSystem + from nomad_simulations.schema_packages.outputs import Outputs + configuration = config.get_plugin_entry_point( 'nomad_simulations.schema_packages:nomad_simulations_plugin' ) @@ -154,3 +159,40 @@ def get_composition(children_names: 'list[str]') -> str: children_count_tup = np.unique(children_names, return_counts=True) formula = ''.join([f'{name}({count})' for name, count in zip(*children_count_tup)]) return formula if formula else None + + +def extract_all_simulation_subsections( + archive: 'EntryArchive', + i_system: int = 0, + i_method: int = -1, + i_output: int = -1, +) -> 'tuple[ModelSystem, ModelMethod, Outputs]': + """ + Extracts the simulation sub-sections for `ModelSystem`, `ModelMethod`, and `Outputs` from the archive. The specific + element of the section returned is specified by the indices `i_system`, `i_method`, and `i_output`. + + This utility function is useful when extracting the initial `ModelSystem` structure, the `ModelMethod` used in + the simulation, and the last `Outputs` section generated by the simulation. + + Args: + archive (EntryArchive): The archive to extract the simulation sub-sections from. + i_system (int, optional): The index of the `ModelSystem` to extract. Defaults to 0. + i_method (int, optional): The index of the `ModelMethod` to extract. Defaults to -1. + i_output (int, optional): The index of the `Outputs` to extract. Defaults to -1. + + Returns: + tuple[ModelSystem, ModelMethod, Outputs]: The extracted `ModelSystem`, `ModelMethod`, and `Outputs` sections. + """ + if ( + not archive.m_xpath('data.model_system') + or not archive.m_xpath('data.model_method') + or not archive.m_xpath('data.outputs') + ): + return None, None, None + try: + system = archive.data.model_system[i_system] + method = archive.data.model_method[i_method] + output = archive.data.outputs[i_output] + return system, method, output + except IndexError: + return None, None, None diff --git a/src/nomad_simulations/schema_packages/workflow/__init__.py b/src/nomad_simulations/schema_packages/workflow/__init__.py new file mode 100644 index 00000000..dadfc62d --- /dev/null +++ b/src/nomad_simulations/schema_packages/workflow/__init__.py @@ -0,0 +1,3 @@ +from .base_workflows import BeyondDFT, BeyondDFTMethod, SimulationWorkflow +from .dft_plus_tb import DFTPlusTB, DFTPlusTBMethod +from .single_point import SinglePoint diff --git a/src/nomad_simulations/schema_packages/workflow/base_workflows.py b/src/nomad_simulations/schema_packages/workflow/base_workflows.py new file mode 100644 index 00000000..47ccfd51 --- /dev/null +++ b/src/nomad_simulations/schema_packages/workflow/base_workflows.py @@ -0,0 +1,143 @@ +from functools import wraps +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from nomad.datamodel.datamodel import EntryArchive + from structlog.stdlib import BoundLogger + +from nomad.datamodel.data import ArchiveSection +from nomad.datamodel.metainfo.workflow import TaskReference, Workflow +from nomad.metainfo import SubSection + +from nomad_simulations.schema_packages.model_method import BaseModelMethod +from nomad_simulations.schema_packages.outputs import Outputs + + +def check_n_tasks(n_tasks: Optional[int] = None): + """ + Check if the `tasks` of a workflow exist. If the `n_tasks` input specified, it checks whether `tasks` + is of the same length as `n_tasks`. + + Args: + n_tasks (Optional[int], optional): The length of the `tasks` needs to be checked if set to an integer. Defaults to None. + """ + + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + if not self.tasks: + return None + if n_tasks is not None and len(self.tasks) != n_tasks: + return None + + return func(self, *args, **kwargs) + + return wrapper + + return decorator + + +class SimulationWorkflow(Workflow): + """ + A base section used to define the workflows of a simulation with references to specific `tasks`, `inputs`, and `outputs`. The + normalize function checks the definition of these sections and sets the name of the workflow. + + A `SimulationWorkflow` will be composed of: + - a `method` section containing methodological parameters used specifically during the workflow, + - a list of `inputs` with references to the `ModelSystem` and, optionally, `ModelMethod` input sections, + - a list of `outputs` with references to the `Outputs` section, + - a list of `tasks` containing references to the activity `Simulation` used in the workflow, + """ + + method = SubSection( + sub_section=BaseModelMethod.m_def, + description=""" + Methodological parameters used during the workflow. + """, + ) + + # TODO implement sorting of tasks in terms of `time_step`/`time` (this makes ParallelWorkflow and SerialWorkflow irrelevant) + + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) + + +class BeyondDFTMethod(ArchiveSection): + """ + An abstract section used to store references to the `ModelMethod` sections of each of the + archives defining the `tasks` and used to build the standard `BeyondDFT` workflow. This section needs to be + inherit and the method references need to be defined for each specific case (see, e.g., dft_plus_tb.py module). + """ + + pass + + +class BeyondDFT(SimulationWorkflow): + """ + A base section used to represent a beyond-DFT workflow and containing a `method` section which uses references + to the specific tasks `ModelMethod` sections. + """ + + method = SubSection( + sub_section=BeyondDFTMethod.m_def, + description=""" + Abstract sub section used to populate the `method` of a `BeyondDFT` workflow with references + to the corresponding `SinglePoint` entries and their `ModelMethod` sections. + """, + ) + + @check_n_tasks() + def resolve_all_outputs(self) -> list[Outputs]: + """ + Resolves all the `Outputs` sections from the `tasks` in the workflow. This is useful when + the workflow is composed of multiple tasks and the outputs need to be stored in a list + for further manipulation, e.g., to plot multiple band structures in a DFT+TB workflow. + + Returns: + list[Outputs]: A list of all the `Outputs` sections from the `tasks`. + """ + # Populate the list of outputs from the last element in `tasks` + all_outputs = [] + for task in self.tasks: + if not task.outputs: + continue + all_outputs.append(task.outputs[-1]) + return all_outputs + + @check_n_tasks() + def resolve_method_refs( + self, tasks: list[TaskReference], tasks_names: list[str] + ) -> list[BaseModelMethod]: + """ + Resolve the references to the `BaseModelMethod` sections in the list of `tasks`. This is useful + when defining the `method` section of the `BeyondDFT` workflow. + + Args: + tasks (list[TaskReference]): The list of tasks from which resolve the `BaseModelMethod` sections. + tasks_names (list[str]): The list of names for each of the tasks forming the BeyondDFT workflow. + + Returns: + list[BaseModelMethod]: The list of resolved `BaseModelMethod` sections. + """ + # Initial check on the inputs + if len(tasks) != len(tasks_names): + return [] + + method_refs = [] + for i, task in enumerate(tasks): + # Define names of the tasks + task.name = tasks_names[i] + + # Check if task.inputs or task.outputs do not exists for any of the 2 tasks + if not task.m_xpath('task.inputs'): + continue + + # Resolve the method of each task.inputs + for input in task.task.inputs: + if isinstance(input.section, BaseModelMethod): + method_refs.append(input.section) + break + return method_refs + + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) diff --git a/src/nomad_simulations/schema_packages/workflow/dft_plus_tb.py b/src/nomad_simulations/schema_packages/workflow/dft_plus_tb.py new file mode 100644 index 00000000..651c988f --- /dev/null +++ b/src/nomad_simulations/schema_packages/workflow/dft_plus_tb.py @@ -0,0 +1,157 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from nomad.datamodel.datamodel import EntryArchive + from structlog.stdlib import BoundLogger + +from nomad.datamodel.metainfo.workflow import Link, TaskReference +from nomad.metainfo import Quantity, Reference + +from nomad_simulations.schema_packages.model_method import DFT, TB +from nomad_simulations.schema_packages.workflow import BeyondDFT, BeyondDFTMethod +from nomad_simulations.schema_packages.workflow.base_workflows import check_n_tasks + +from .single_point import SinglePoint + + +class DFTPlusTBMethod(BeyondDFTMethod): + """ + Section used to reference the `DFT` and `TB` `ModelMethod` sections in each of the archives + conforming a DFT+TB simulation workflow. + """ + + dft_method_ref = Quantity( + type=Reference(DFT), + description=""" + Reference to the DFT `ModelMethod` section in the DFT task. + """, + ) + tb_method_ref = Quantity( + type=Reference(TB), + description=""" + Reference to the TB `ModelMethod` section in the TB task. + """, + ) + + +class DFTPlusTB(BeyondDFT): + """ + A base section used to represent a DFT+TB calculation workflow. The `DFTPlusTB` workflow is composed of + two tasks: the initial DFT calculation + the final TB projection. + + The section only needs to be populated with (everything else is handled by the `normalize` function): + i. The `tasks` as `TaskReference` sections, adding `task` to the specific archive.workflow2 sections. + ii. The `inputs` and `outputs` as `Link` sections pointing to the specific archives. + + Note 1: the `inputs[0]` of the `DFTPlusTB` coincides with the `inputs[0]` of the DFT task (`ModelSystem` section). + Note 2: the `outputs[-1]` of the `DFTPlusTB` coincides with the `outputs[-1]` of the TB task (`Outputs` section). + Note 3: the `outputs[-1]` of the DFT task is used as `inputs[0]` of the TB task. + + The archive.workflow2 section is: + - name = 'DFT+TB' + - method = DFTPlusTBMethod( + dft_method_ref=dft_archive.data.model_method[-1], + tb_method_ref=tb_archive.data.model_method[-1], + ) + - inputs = [ + Link(name='Input Model System', section=dft_archive.data.model_system[0]), + ] + - outputs = [ + Link(name='Output TB Data', section=tb_archive.data.outputs[-1]), + ] + - tasks = [ + TaskReference( + name='DFT SinglePoint Task', + task=dft_archive.workflow2 + inputs=[ + Link(name='Input Model System', section=dft_archive.data.model_system[0]), + ], + outputs=[ + Link(name='Output DFT Data', section=dft_archive.data.outputs[-1]), + ] + ), + TaskReference( + name='TB SinglePoint Task', + task=tb_archive.workflow2, + inputs=[ + Link(name='Output DFT Data', section=dft_archive.data.outputs[-1]), + ], + outputs=[ + Link(name='Output tb Data', section=tb_archive.data.outputs[-1]), + ] + ), + ] + """ + + @check_n_tasks(n_tasks=2) + def link_task_inputs_outputs( + self, tasks: list[TaskReference], logger: 'BoundLogger' + ) -> None: + if not self.inputs or not self.outputs: + logger.warning( + 'The `DFTPlusTB` workflow needs to have `inputs` and `outputs` defined in order to link with the `tasks`.' + ) + return None + + dft_task = tasks[0] + tb_task = tasks[1] + + # Initial check + if not dft_task.m_xpath('task.outputs'): + return None + + # Input of DFT Task is the ModelSystem + dft_task.inputs = [ + Link(name='Input Model System', section=self.inputs[0]), + ] + # Output of DFT Task is the output section of the DFT entry + dft_task.outputs = [ + Link(name='Output DFT Data', section=dft_task.task.outputs[-1]), + ] + # Input of TB Task is the output of the DFT task + tb_task.inputs = [ + Link(name='Output DFT Data', section=dft_task.task.outputs[-1]), + ] + # Output of TB Task is the output section of the TB entry + tb_task.outputs = [ + Link(name='Output TB Data', section=self.outputs[-1]), + ] + + # TODO check if implementing overwritting the FermiLevel.value in the TB entry from the DFT entry + + @check_n_tasks(n_tasks=2) + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) + + # Check if `tasks` are not SinglePoints + for task in self.tasks: + if not task.task: + logger.error( + 'A `DFTPlusTB` workflow must have two `SinglePoint` tasks references.' + ) + return + if not isinstance(task.task, SinglePoint): + logger.error( + 'The referenced tasks in the `DFTPlusTB` workflow must be of type `SinglePoint`.' + ) + return + + # Define name of the workflow + self.name = 'DFT+TB' + + # Resolve `method` + method_refs = self.resolve_method_refs( + tasks=self.tasks, + tasks_names=['DFT SinglePoint Task', 'TB SinglePoint Task'], + ) + if method_refs is not None: + method_workflow = DFTPlusTBMethod() + for method in method_refs: + if isinstance(method, DFT): + method_workflow.dft_method_ref = method + elif isinstance(method, TB): + method_workflow.tb_method_ref = method + self.method = method_workflow + + # Resolve `tasks[*].inputs` and `tasks[*].outputs` + self.link_task_inputs_outputs(tasks=self.tasks, logger=logger) diff --git a/src/nomad_simulations/schema_packages/workflow/single_point.py b/src/nomad_simulations/schema_packages/workflow/single_point.py new file mode 100644 index 00000000..e2b9d669 --- /dev/null +++ b/src/nomad_simulations/schema_packages/workflow/single_point.py @@ -0,0 +1,73 @@ +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from nomad.datamodel.datamodel import EntryArchive + from structlog.stdlib import BoundLogger + +from nomad.datamodel.metainfo.workflow import Link +from nomad.metainfo import Quantity + +from nomad_simulations.schema_packages.outputs import SCFOutputs +from nomad_simulations.schema_packages.utils import extract_all_simulation_subsections +from nomad_simulations.schema_packages.workflow import SimulationWorkflow + + +class SinglePoint(SimulationWorkflow): + """ + A base section used to represent a single point calculation workflow. The `SinglePoint` + workflow is the minimum workflow required to represent a simulation. The self-consistent steps of + scf simulation are represented inside the `SinglePoint` workflow. + + The section only needs to be instantiated, and everything else will be extracted from the `normalize` function. + The archive needs to have `archive.data` sub-sections (model_sytem, model_method, outputs) populated. + + The archive.workflow2 section is: + - name = 'SinglePoint' + - inputs = [ + Link(name='Input Model System', section=archive.data.model_system[0]), + Link(name='Input Model Method', section=archive.data.model_method[-1]), + ] + - outputs = [ + Link(name='Output Data', section=archive.data.outputs[-1]), + ] + - tasks = [] + """ + + # ? is this necessary? + n_scf_steps = Quantity( + type=np.int32, + default=1, + description=""" + The number of self-consistent field (SCF) steps in the simulation. This is calculated + in the normalizer by storing the length of the `SCFOutputs` section in archive.data. Defaults + to 1. + """, + ) + + def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: + super().normalize(archive, logger) + + # Define name + self.name = 'SinglePoint' + + # Define `inputs` and `outputs` + input_model_system, input_model_method, output = ( + extract_all_simulation_subsections(archive=archive) + ) + if not input_model_system or not input_model_method or not output: + logger.warning( + 'Could not find the ModelSystem, ModelMethod, or Outputs section in the archive.data section of the SinglePoint entry.' + ) + return + self.inputs = [ + Link(name='Input Model System', section=input_model_system), + Link(name='Input Model Method', section=input_model_method), + ] + self.outputs = [Link(name='Output Data', section=output)] + + # Resolve the `n_scf_steps` if the output is of `SCFOutputs` type + if isinstance(output, SCFOutputs): + if output.scf_steps is not None and len(output.scf_steps) > 0: + self.n_scf_steps = len(output.scf_steps) diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index a50978f6..46dd4d33 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -1,11 +1,16 @@ import pytest +from nomad.datamodel.datamodel import EntryArchive +from nomad_simulations.schema_packages.general import Simulation +from nomad_simulations.schema_packages.model_method import ModelMethod from nomad_simulations.schema_packages.model_system import ( AtomicCell, ModelSystem, Symmetry, ) +from nomad_simulations.schema_packages.outputs import Outputs from nomad_simulations.schema_packages.utils import ( + extract_all_simulation_subsections, get_sibling_section, get_variables, is_not_representative, @@ -84,3 +89,82 @@ def test_get_variables(variables: list, result: list, result_length: int): assert len(energies) == result_length for i, energy in enumerate(energies): # asserting energies == result does not work assert energy.n_points == result[i].n_points + + +@pytest.mark.parametrize( + 'archive, subsection_indices, result', + [ + # no data section + ( + EntryArchive(), + [0, -1, -1], + [None, None, None], + ), + # no subsections + ( + EntryArchive(data=Simulation()), + [0, -1, -1], + [None, None, None], + ), + # no model_method and outputs + ( + EntryArchive(data=Simulation(model_system=[ModelSystem()])), + [0, -1, -1], + [None, None, None], + ), + # no outputs + ( + EntryArchive( + data=Simulation( + model_system=[ModelSystem()], model_method=[ModelMethod()] + ) + ), + [0, -1, -1], + [None, None, None], + ), + # all subsections + ( + EntryArchive( + data=Simulation( + model_system=[ModelSystem()], + model_method=[ModelMethod()], + outputs=[Outputs()], + ) + ), + [0, -1, -1], + [ModelSystem(), ModelMethod(), Outputs()], + ), + # wrong index for model_system + ( + EntryArchive( + data=Simulation( + model_system=[ModelSystem()], + model_method=[ModelMethod()], + outputs=[Outputs()], + ) + ), + [2, -1, -1], + [None, None, None], + ), + ], +) +def test_extract_all_simulation_subsections( + archive: EntryArchive, subsection_indices: list, result: list +): + """ + Test the `extract_all_simulation_subsections` utility function. + """ + system, method, output = extract_all_simulation_subsections( + archive=archive, + i_system=subsection_indices[0], + i_method=subsection_indices[1], + i_output=subsection_indices[2], + ) + if result[0] is not None: + assert ( + isinstance(system, ModelSystem) + and isinstance(method, ModelMethod) + and isinstance(output, Outputs) + ) + else: + assert system == result[0] and method == result[1] and output == result[2] diff --git a/tests/workflow/__init__.py b/tests/workflow/__init__.py new file mode 100644 index 00000000..5cdfd197 --- /dev/null +++ b/tests/workflow/__init__.py @@ -0,0 +1,3 @@ +from nomad import utils + +logger = utils.get_logger(__name__) diff --git a/tests/workflow/test_base_workflows.py b/tests/workflow/test_base_workflows.py new file mode 100644 index 00000000..2ca65d0b --- /dev/null +++ b/tests/workflow/test_base_workflows.py @@ -0,0 +1,155 @@ +import pytest +from nomad.datamodel.metainfo.workflow import Link, Task, TaskReference + +from nomad_simulations.schema_packages.model_method import ( + DFT, + TB, + ModelMethod, +) +from nomad_simulations.schema_packages.model_system import ModelSystem +from nomad_simulations.schema_packages.outputs import Outputs, SCFOutputs +from nomad_simulations.schema_packages.workflow import BeyondDFT, SinglePoint + + +class TestBeyondDFT: + @pytest.mark.parametrize( + 'tasks, result', + [ + # no task + (None, None), + # empty task + ([Task()], []), + # no outputs + ([Task(name='task')], []), + # one task with one output + ([Task(outputs=[Link(section=Outputs())])], [Outputs]), + # one task with two outputs (last one is SCF type) + ( + [Task(outputs=[Link(section=Outputs()), Link(section=SCFOutputs())])], + [SCFOutputs], + ), + # two tasks with one output each + ( + [ + Task(outputs=[Link(section=Outputs())]), + Task(outputs=[Link(section=SCFOutputs())]), + ], + [Outputs, SCFOutputs], + ), + # two tasks with two outputs each (note order of the last outputs types) + ( + [ + Task(outputs=[Link(section=Outputs()), Link(section=SCFOutputs())]), + Task(outputs=[Link(section=SCFOutputs()), Link(section=Outputs())]), + ], + [SCFOutputs, Outputs], + ), + ], + ) + def test_resolve_all_outputs(self, tasks: list[Task], result: list[Outputs]): + """ + Test the `resolve_all_outputs` method of the `BeyondDFT` section. + """ + workflow = BeyondDFT() + workflow.tasks = tasks + all_outputs = workflow.resolve_all_outputs() + if not result: + assert all_outputs == result + else: + # ! comparing directly does not work becasue one is a section, the other a reference + for i, output in enumerate(all_outputs): + assert isinstance(output.section, result[i]) + + @pytest.mark.parametrize( + 'tasks, result', + [ + # no task + (None, None), + # only one task + ([TaskReference()], []), + # two empty tasks + ([TaskReference(), TaskReference()], []), + # two tasks with only empty task + ( + [TaskReference(task=SinglePoint()), TaskReference(task=SinglePoint())], + [], + ), + # two tasks with task with one input ModelSystem each + ( + [ + TaskReference( + task=SinglePoint(inputs=[Link(section=ModelSystem())]) + ), + TaskReference( + task=SinglePoint(inputs=[Link(section=ModelSystem())]) + ), + ], + [], + ), + # two tasks with task with one input ModelSystem each and only DFT input + ( + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(section=ModelSystem()), + Link(section=DFT()), + ] + ) + ), + TaskReference( + task=SinglePoint( + inputs=[ + Link(section=ModelSystem()), + ] + ) + ), + ], + [DFT], + ), + # two tasks with task with inputs for ModelSystem and DFT and TB + ( + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(section=ModelSystem()), + Link(section=DFT()), + ] + ) + ), + TaskReference( + task=SinglePoint( + inputs=[ + Link(section=ModelSystem()), + Link(section=TB()), + ] + ) + ), + ], + [DFT, TB], + ), + ], + ) + def test_resolve_method_refs( + self, tasks: list[TaskReference], result: list[ModelMethod] + ): + """ + Test the `resolve_method_refs` method of the `BeyondDFT` section. + """ + workflow = BeyondDFT() + workflow.tasks = tasks + method_refs = workflow.resolve_method_refs( + tasks=workflow.tasks, + tasks_names=['DFT SinglePoint Task', 'TB SinglePoint Task'], + ) + + if tasks is not None and len(tasks) == 2: + assert workflow.tasks[0].name == 'DFT SinglePoint Task' + assert workflow.tasks[1].name == 'TB SinglePoint Task' + if not result: + assert method_refs == result + else: + # ! comparing directly does not work becasue one is a section, the other a reference + for i, method in enumerate(result): + assert isinstance(method_refs[i], method) diff --git a/tests/workflow/test_dft_plus_tb.py b/tests/workflow/test_dft_plus_tb.py new file mode 100644 index 00000000..c8730d4a --- /dev/null +++ b/tests/workflow/test_dft_plus_tb.py @@ -0,0 +1,347 @@ +from typing import Optional + +import pytest +from nomad.datamodel import EntryArchive +from nomad.datamodel.metainfo.workflow import Link, TaskReference + +from nomad_simulations.schema_packages.model_method import ( + DFT, + TB, + ModelMethod, +) +from nomad_simulations.schema_packages.model_system import ModelSystem +from nomad_simulations.schema_packages.outputs import Outputs, SCFOutputs +from nomad_simulations.schema_packages.workflow import DFTPlusTB, SinglePoint + +from ..conftest import generate_simulation +from . import logger + + +class TestDFTPlusTB: + @pytest.mark.parametrize( + 'inputs, outputs, tasks, result_tasks', + [ + # no inputs, outputs, tasks + (None, None, None, []), + # only 1 task + (None, None, [TaskReference()], []), + # empty tasks + ( + None, + None, + [TaskReference(), TaskReference()], + [], + ), + # only one task is populated + ( + None, + None, + [ + TaskReference(task=SinglePoint()), + TaskReference(), + ], + [], + ), + # only one task is populated with inputs + ( + None, + None, + [ + TaskReference(task=SinglePoint(inputs=[Link()])), + TaskReference(task=SinglePoint()), + ], + [], + ), + # only one task is populated with outputs + ( + None, + None, + [ + TaskReference(task=SinglePoint(outputs=[Link(name='output dft')])), + TaskReference(task=SinglePoint()), + ], + [], + ), + # positive testing + ( + [Link(name='input system')], + [Link(name='output tb')], + [ + TaskReference(task=SinglePoint(outputs=[Link(name='output dft')])), + TaskReference(task=SinglePoint()), + ], + [ + TaskReference( + task=SinglePoint(outputs=[Link(name='output dft')]), + inputs=[Link(name='Input Model System')], + outputs=[Link(name='Output DFT Data')], + ), + TaskReference( + task=SinglePoint(), + inputs=[Link(name='Output DFT Data')], + outputs=[Link(name='Output TB Data')], + ), + ], + ), + ], + ) + def test_link_task_inputs_outputs( + self, + inputs: list[Link], + outputs: list[Link], + tasks: list[TaskReference], + result_tasks: list[TaskReference], + ): + """ + Test the `link_task_inputs_outputs` method of the `DFTPlusTB` section. + """ + workflow = DFTPlusTB() + workflow.tasks = tasks + workflow.inputs = inputs + workflow.outputs = outputs + + workflow.link_task_inputs_outputs(tasks=workflow.tasks, logger=logger) + + if not result_tasks: + assert not workflow.m_xpath('tasks[0].inputs') and not workflow.m_xpath( + 'tasks[0].outputs' + ) + assert not workflow.m_xpath('tasks[1].inputs') and not workflow.m_xpath( + 'tasks[1].outputs' + ) + else: + for i, task in enumerate(workflow.tasks): + assert task.inputs[0].name == result_tasks[i].inputs[0].name + assert task.outputs[0].name == result_tasks[i].outputs[0].name + + @pytest.mark.parametrize( + 'inputs, outputs, tasks, result_name, result_methods, result_tasks', + [ + # all none + (None, None, None, None, None, []), + # only one task + (None, None, [TaskReference()], None, None, []), + # two empty tasks + (None, None, [TaskReference(), TaskReference()], None, None, []), + # only one task has a task + ( + None, + None, + [TaskReference(task=SinglePoint()), TaskReference()], + None, + None, + [], + ), + # both tasks with empty task sections, one is not SinglePoint + ( + None, + None, + [TaskReference(task=DFTPlusTB()), TaskReference(task=SinglePoint())], + None, + None, + [], + ), + # both tasks with empty SinglePoint task sections; name is resolved + ( + None, + None, + [TaskReference(task=SinglePoint()), TaskReference(task=SinglePoint())], + 'DFT+TB', + None, + [], + ), + # both tasks have input for ModelSystem + ( + None, + None, + [ + TaskReference( + task=SinglePoint( + inputs=[Link(name='input system', section=ModelSystem())] + ) + ), + TaskReference( + task=SinglePoint( + inputs=[Link(name='input system', section=ModelSystem())] + ) + ), + ], + 'DFT+TB', + None, + [], + ), + # one task has an input with a ref to DFT section + ( + None, + None, + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='dft method', section=DFT()), + ] + ) + ), + TaskReference( + task=SinglePoint( + inputs=[Link(name='input system', section=ModelSystem())] + ) + ), + ], + 'DFT+TB', + [DFT], + [], + ), + # both tasks have inputs with refs to DFT and TB sections + ( + None, + None, + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='dft method', section=DFT()), + ] + ) + ), + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='tb method', section=TB()), + ] + ) + ), + ], + 'DFT+TB', + [DFT, TB], + [], + ), + # one task has an output, but the workflow inputs and outputs are empty + ( + None, + None, + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='dft method', section=DFT()), + ], + outputs=[Link(name='output dft', section=Outputs())], + ) + ), + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='tb method', section=TB()), + ], + ) + ), + ], + 'DFT+TB', + [DFT, TB], + [], + ), + # positive testing + ( + [Link(name='input system')], + [Link(name='output tb')], + [ + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='dft method', section=DFT()), + ], + outputs=[Link(name='output dft', section=Outputs())], + ) + ), + TaskReference( + task=SinglePoint( + inputs=[ + Link(name='input system', section=ModelSystem()), + Link(name='tb method', section=TB()), + ], + outputs=[Link(name='output tb', section=Outputs())], + ) + ), + ], + 'DFT+TB', + [DFT, TB], + [ + TaskReference( + task=SinglePoint(outputs=[Link(name='output dft')]), + inputs=[Link(name='Input Model System')], + outputs=[Link(name='Output DFT Data')], + ), + TaskReference( + task=SinglePoint(), + inputs=[Link(name='Output DFT Data')], + outputs=[Link(name='Output TB Data')], + ), + ], + ), + ], + ) + def test_normalize( + self, + inputs: list[Link], + outputs: list[Link], + tasks: list[TaskReference], + result_name: Optional[str], + result_methods: Optional[list[ModelMethod]], + result_tasks: Optional[list[TaskReference]], + ): + """ + Test the `normalize` method of the `DFTPlusTB` section. + """ + archive = EntryArchive() + + # Add `Simulation` to archive + simulation = generate_simulation( + model_system=ModelSystem(), model_method=ModelMethod(), outputs=Outputs() + ) + archive.data = simulation + + # Add `SinglePoint` to archive + workflow = DFTPlusTB() + workflow.inputs = inputs + workflow.outputs = outputs + workflow.tasks = tasks + archive.workflow2 = workflow + + workflow.normalize(archive=archive, logger=logger) + + # Test `name` of the workflow + assert workflow.name == result_name + + # Test `method` of the workflow + if len(result_tasks) > 0: + assert workflow.tasks[0].name == 'DFT SinglePoint Task' + assert workflow.tasks[1].name == 'TB SinglePoint Task' + if not result_methods: + assert not workflow.m_xpath( + 'method.dft_method_ref' + ) and not workflow.m_xpath('method.tb_method_ref') + else: + # ! comparing directly does not work becasue one is a section, the other a reference + assert isinstance(workflow.method.dft_method_ref, result_methods[0]) + if len(result_methods) == 2: + assert isinstance(workflow.method.tb_method_ref, result_methods[1]) + + # Test `tasks` of the workflow + if not result_tasks: + assert not workflow.m_xpath('tasks[0].inputs') and not workflow.m_xpath( + 'tasks[0].outputs' + ) + assert not workflow.m_xpath('tasks[1].inputs') and not workflow.m_xpath( + 'tasks[1].outputs' + ) + else: + for i, task in enumerate(workflow.tasks): + assert task.inputs[0].name == result_tasks[i].inputs[0].name + assert task.outputs[0].name == result_tasks[i].outputs[0].name diff --git a/tests/workflow/test_single_point.py b/tests/workflow/test_single_point.py new file mode 100644 index 00000000..6de3d6c6 --- /dev/null +++ b/tests/workflow/test_single_point.py @@ -0,0 +1,103 @@ +from typing import Optional + +import pytest +from nomad.datamodel import EntryArchive +from nomad.datamodel.metainfo.workflow import Link + +from nomad_simulations.schema_packages.model_method import ModelMethod +from nomad_simulations.schema_packages.model_system import ModelSystem +from nomad_simulations.schema_packages.outputs import Outputs, SCFOutputs +from nomad_simulations.schema_packages.workflow import SinglePoint + +from ..conftest import generate_simulation +from . import logger + + +class TestBeyondDFT: + @pytest.mark.parametrize( + 'model_system, model_method, outputs, result_inputs, result_outputs, result_n_scf_steps', + [ + # no task + (None, None, None, [], [], 1), + (ModelSystem(), None, None, [], [], 1), + (ModelSystem(), ModelMethod(), None, [], [], 1), + ( + ModelSystem(), + ModelMethod(), + Outputs(), + [ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + [Link(name='Output Data', section=Outputs())], + 1, + ), + ( + ModelSystem(), + ModelMethod(), + SCFOutputs(), + [ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + [Link(name='Output Data', section=SCFOutputs())], + 1, + ), + ( + ModelSystem(), + ModelMethod(), + SCFOutputs(scf_steps=[Outputs(), Outputs(), Outputs()]), + [ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + [ + Link( + name='Output Data', + section=SCFOutputs(scf_steps=[Outputs(), Outputs(), Outputs()]), + ) + ], + 3, + ), + ], + ) + def test_resolve_all_outputs( + self, + model_system: Optional[ModelSystem], + model_method: Optional[ModelMethod], + outputs: Optional[Outputs], + result_inputs, + result_outputs, + result_n_scf_steps: Optional[int], + ): + """ + Test the `resolve_all_outputs` method of the `BeyondDFT` section. + """ + archive = EntryArchive() + + # Add `Simulation` to archive + simulation = generate_simulation( + model_system=model_system, model_method=model_method, outputs=outputs + ) + archive.data = simulation + + # Add `SinglePoint` to archive + workflow = SinglePoint() + archive.workflow2 = workflow + + workflow.normalize(archive=archive, logger=logger) + + assert workflow.name == 'SinglePoint' + if not result_inputs: + assert workflow.inputs == result_inputs + assert workflow.outputs == result_outputs + else: + # ! comparing directly does not work becasue one is a section, the other a reference + for i, input in enumerate(workflow.inputs): + assert input.name == result_inputs[i].name + assert isinstance(input.section, type(result_inputs[i].section)) + assert workflow.outputs[0].name == result_outputs[0].name + assert isinstance( + workflow.outputs[0].section, type(result_outputs[0].section) + ) + assert workflow.n_scf_steps == result_n_scf_steps