diff --git a/src/nomad_simulations/schema_packages/workflow/single_point.py b/src/nomad_simulations/schema_packages/workflow/single_point.py index ef299044..15592046 100644 --- a/src/nomad_simulations/schema_packages/workflow/single_point.py +++ b/src/nomad_simulations/schema_packages/workflow/single_point.py @@ -83,18 +83,30 @@ def resolve_n_scf_steps(self) -> int: Returns: int: The number of SCF steps. """ + # Initial check + if not self.outputs: + return 1 for output in self.outputs: - if not isinstance(output, SCFOutputs): + # Check if `self.outputs` has a `section` + if not output.section: continue - if output.scf_steps is not None: - return len(output.scf_steps) + # Check if the section is `SCFOutputs` + if not isinstance(output.section, SCFOutputs): + continue + scf_output = output.section + # Check if there are `scf_steps` + if not scf_output.scf_steps: + continue + return len(scf_output.scf_steps) return 1 def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None: super().normalize(archive, logger) + # SinglePoint can only have one task; if it has more, delete the `tasks` if self.tasks is not None and len(self.tasks) > 1: logger.error('A `SinglePoint` workflow must have only one task.') + self.tasks = None return # Generate the `tasks` section if this does not exist diff --git a/tests/workflow/test_single_point.py b/tests/workflow/test_single_point.py index 0ccf3bfe..d43b8da2 100644 --- a/tests/workflow/test_single_point.py +++ b/tests/workflow/test_single_point.py @@ -20,11 +20,11 @@ import pytest from nomad.datamodel import EntryArchive -from nomad.datamodel.metainfo.workflow import Link, Task, Workflow +from nomad.datamodel.metainfo.workflow import Link, Task -from nomad_simulations.schema_packages.model_method import BaseModelMethod, ModelMethod +from nomad_simulations.schema_packages.model_method import ModelMethod from nomad_simulations.schema_packages.model_system import ModelSystem -from nomad_simulations.schema_packages.outputs import Outputs +from nomad_simulations.schema_packages.outputs import Outputs, SCFOutputs from nomad_simulations.schema_packages.workflow import SinglePoint from ..conftest import generate_simulation @@ -87,14 +87,179 @@ def test_generate_task( assert single_point_task.inputs[1].name == result.inputs[1].name assert single_point_task.outputs[0].name == result.outputs[0].name - def test_resolve_n_scf_steps(): + @pytest.mark.parametrize( + 'scf_output, result', + [ + # no outputs + (None, 1), + # output is not of type SCFOutputs + (Outputs(), 1), + # SCFOutputs without scf_steps + (SCFOutputs(), 1), + # 3 scf_steps + (SCFOutputs(scf_steps=[Outputs(), Outputs(), Outputs()]), 3), + ], + ) + def test_resolve_n_scf_steps(self, scf_output: Outputs, result: int): """ Test the `resolve_n_scf_steps` method of the `SinglePoint` section. """ - assert True + archive = EntryArchive() + simulation = generate_simulation( + model_system=ModelSystem(), model_method=ModelMethod(), outputs=scf_output + ) + archive.data = simulation + workflow = SinglePoint() + archive.workflow2 = workflow + + # Add the scf output to the workflow.outputs + if scf_output is not None: + workflow.outputs = [ + Link(name='SCF Output Data', section=archive.data.outputs[-1]) + ] + + n_scf_steps = workflow.resolve_n_scf_steps() + assert n_scf_steps == result - def test_normalize(): + @pytest.mark.parametrize( + 'model_system, model_method, outputs, tasks, result_task, result_n_scf_steps', + [ + # multiple tasks being stored in SinglePoint + ( + ModelSystem(), + ModelMethod(), + Outputs(), + [Task(name='task 1'), Task(name='task 2')], + [], + None, + ), + # only one task is being stored in SinglePoint + ( + ModelSystem(), + ModelMethod(), + Outputs(), + [Task(name='parsed task')], + [Task(name='parsed task')], + 1, + ), + # no archive sections (empty generated task) + (None, None, None, None, [Task(name='generated task')], 1), + # only one section in archive.data + (ModelSystem(), None, None, None, [Task(name='generated task')], 1), + # another section in archive.data + (None, ModelMethod(), None, None, [Task(name='generated task')], 1), + # only two sections in archive.data + ( + ModelSystem(), + ModelMethod(), + None, + None, + [Task(name='generated task')], + 1, + ), + # all sections in archive.data, so generated task has inputs and outputs + ( + ModelSystem(), + ModelMethod(), + Outputs(), + None, + [ + Task( + name='generated task', + inputs=[ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + outputs=[ + Link(name='Output Data', section=Outputs()), + ], + ) + ], + 1, + ), + # Outputs is SCFOutputs but no scf_steps + ( + ModelSystem(), + ModelMethod(), + SCFOutputs(), + None, + [ + Task( + name='generated task', + inputs=[ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + outputs=[ + Link(name='Output Data', section=SCFOutputs()), + ], + ) + ], + 1, + ), + # 3 scf_steps + ( + ModelSystem(), + ModelMethod(), + SCFOutputs(scf_steps=[Outputs(), Outputs(), Outputs()]), + None, + [ + Task( + name='generated task', + inputs=[ + Link(name='Input Model System', section=ModelSystem()), + Link(name='Input Model Method', section=ModelMethod()), + ], + outputs=[ + Link( + name='Output Data', + section=SCFOutputs( + scf_steps=[Outputs(), Outputs(), Outputs()] + ), + ), + ], + ) + ], + 3, + ), + ], + ) + def test_normalize( + self, + model_system: Optional[ModelSystem], + model_method: Optional[ModelMethod], + outputs: Optional[Outputs], + tasks: list[Task], + result_task: list[Task], + result_n_scf_steps: int, + ): """ Test the `normalize` method of the `SinglePoint` section. """ - assert True + archive = EntryArchive() + simulation = generate_simulation( + model_system=model_system, model_method=model_method, outputs=outputs + ) + archive.data = simulation + workflow = SinglePoint() + archive.workflow2 = workflow + + if tasks is not None: + workflow.tasks = tasks + + workflow.normalize(archive=archive, logger=logger) + + if not result_task: + assert workflow.tasks == result_task + else: + single_point_task = workflow.tasks[0] + if not result_task[0].inputs: + assert isinstance(single_point_task, Task) + assert not single_point_task.inputs and not single_point_task.outputs + else: + assert single_point_task.inputs[0].name == result_task[0].inputs[0].name + assert single_point_task.inputs[1].name == result_task[0].inputs[1].name + assert ( + single_point_task.outputs[0].name == result_task[0].outputs[0].name + ) + assert workflow.n_scf_steps == result_n_scf_steps