Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial definitions for simulation workflow #121

Open
wants to merge 25 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3f388cd
Added SimulationWorkflow, SinglePoint
JosePizarro3 Jul 22, 2024
cd5cf6d
Added testing for SimulationWorkflow
JosePizarro3 Sep 17, 2024
ce004d5
Move testing to subfolders to mimic structure in src
JosePizarro3 Sep 18, 2024
c3f21ef
Rename testing file
JosePizarro3 Sep 18, 2024
d3042fe
Changed name to BeyondDFT
JosePizarro3 Sep 18, 2024
03df03d
Add testing for BeyondDFT workflow
JosePizarro3 Sep 18, 2024
f520a9a
Fix resolve_inputs_outputs method
JosePizarro3 Sep 18, 2024
61bef1b
Add testing SinglePoint.generate_task method
JosePizarro3 Sep 18, 2024
64d85f6
Added testing for SinglePoint methods
JosePizarro3 Sep 19, 2024
6a03e4c
Fix types
JosePizarro3 Sep 19, 2024
c34705f
Added check_n_tasks decorator
JosePizarro3 Sep 19, 2024
e313bb1
Added testing for link_tasks
JosePizarro3 Sep 19, 2024
8e80dde
Added todo for testing overwrite_fermi_level once this is under control
JosePizarro3 Sep 19, 2024
8eff916
Initial idea (equivalent to the workflow-task schema
JosePizarro3 Sep 19, 2024
4213e99
Added utils extract_simulation_subsections
JosePizarro3 Sep 19, 2024
c64c894
Fix base_workflows and testing
JosePizarro3 Sep 19, 2024
cc7a8ee
Change name to extract_all_simulation_subsections
JosePizarro3 Sep 19, 2024
d39c525
Fix single_point and testing
JosePizarro3 Sep 19, 2024
304d344
Fix imports
JosePizarro3 Sep 19, 2024
95fdf8d
Add resolve_method_refs method to BeyondDFT
JosePizarro3 Sep 19, 2024
a3b95bf
Fix dft_plus_tb and testing
JosePizarro3 Sep 19, 2024
d2d57ff
Added more testing and comments
JosePizarro3 Sep 20, 2024
6a7668b
Rebase and delete copyright notice
JosePizarro3 Oct 2, 2024
14b982b
Deleting copyright text
JosePizarro3 Oct 2, 2024
ad055ee
Added comments
JosePizarro3 Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/nomad_simulations/schema_packages/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .utils import (
RussellSaundersState,
extract_all_simulation_subsections,
get_composition,
get_sibling_section,
get_variables,
Expand Down
42 changes: 42 additions & 0 deletions src/nomad_simulations/schema_packages/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@
from typing import Optional

from nomad.datamodel.data import ArchiveSection
from nomad.datamodel.datamodel import EntryArchive
from structlog.stdlib import BoundLogger

from nomad_simulations.schema_packages.model_method import ModelMethod
from nomad_simulations.schema_packages.model_system import ModelSystem
from nomad_simulations.schema_packages.outputs import Outputs

configuration = config.get_plugin_entry_point(
'nomad_simulations.schema_packages:nomad_simulations_plugin'
)
Expand Down Expand Up @@ -154,3 +159,40 @@ def get_composition(children_names: 'list[str]') -> str:
children_count_tup = np.unique(children_names, return_counts=True)
formula = ''.join([f'{name}({count})' for name, count in zip(*children_count_tup)])
return formula if formula else None


def extract_all_simulation_subsections(
archive: 'EntryArchive',
i_system: int = 0,
i_method: int = -1,
i_output: int = -1,
) -> 'tuple[ModelSystem, ModelMethod, Outputs]':
"""
Extracts the simulation sub-sections for `ModelSystem`, `ModelMethod`, and `Outputs` from the archive. The specific
element of the section returned is specified by the indices `i_system`, `i_method`, and `i_output`.

This utility function is useful when extracting the initial `ModelSystem` structure, the `ModelMethod` used in
the simulation, and the last `Outputs` section generated by the simulation.

Args:
archive (EntryArchive): The archive to extract the simulation sub-sections from.
i_system (int, optional): The index of the `ModelSystem` to extract. Defaults to 0.
i_method (int, optional): The index of the `ModelMethod` to extract. Defaults to -1.
i_output (int, optional): The index of the `Outputs` to extract. Defaults to -1.

Returns:
tuple[ModelSystem, ModelMethod, Outputs]: The extracted `ModelSystem`, `ModelMethod`, and `Outputs` sections.
"""
if (
not archive.m_xpath('data.model_system')
or not archive.m_xpath('data.model_method')
or not archive.m_xpath('data.outputs')
):
return None, None, None
try:
system = archive.data.model_system[i_system]
method = archive.data.model_method[i_method]
output = archive.data.outputs[i_output]
return system, method, output
except IndexError:
return None, None, None
3 changes: 3 additions & 0 deletions src/nomad_simulations/schema_packages/workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base_workflows import BeyondDFT, BeyondDFTMethod, SimulationWorkflow
from .dft_plus_tb import DFTPlusTB, DFTPlusTBMethod
from .single_point import SinglePoint
143 changes: 143 additions & 0 deletions src/nomad_simulations/schema_packages/workflow/base_workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from functools import wraps
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from nomad.datamodel.datamodel import EntryArchive
from structlog.stdlib import BoundLogger

from nomad.datamodel.data import ArchiveSection
from nomad.datamodel.metainfo.workflow import TaskReference, Workflow
from nomad.metainfo import SubSection

from nomad_simulations.schema_packages.model_method import BaseModelMethod
from nomad_simulations.schema_packages.outputs import Outputs


def check_n_tasks(n_tasks: Optional[int] = None):
JosePizarro3 marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if the `tasks` of a workflow exist. If the `n_tasks` input specified, it checks whether `tasks`
is of the same length as `n_tasks`.

Args:
n_tasks (Optional[int], optional): The length of the `tasks` needs to be checked if set to an integer. Defaults to None.
"""

def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
if not self.tasks:
return None
if n_tasks is not None and len(self.tasks) != n_tasks:
return None

return func(self, *args, **kwargs)

return wrapper

return decorator


class SimulationWorkflow(Workflow):
"""
A base section used to define the workflows of a simulation with references to specific `tasks`, `inputs`, and `outputs`. The
normalize function checks the definition of these sections and sets the name of the workflow.

A `SimulationWorkflow` will be composed of:
- a `method` section containing methodological parameters used specifically during the workflow,
- a list of `inputs` with references to the `ModelSystem` and, optionally, `ModelMethod` input sections,
- a list of `outputs` with references to the `Outputs` section,
- a list of `tasks` containing references to the activity `Simulation` used in the workflow,
"""

method = SubSection(
sub_section=BaseModelMethod.m_def,
description="""
Methodological parameters used during the workflow.
""",
)

# TODO implement sorting of tasks in terms of `time_step`/`time` (this makes ParallelWorkflow and SerialWorkflow irrelevant)

def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None:
super().normalize(archive, logger)


class BeyondDFTMethod(ArchiveSection):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No base class for method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is just a place holder for referencing DFT, TB, DMFT, GW... sections in the archives of my workflows.

"""
An abstract section used to store references to the `ModelMethod` sections of each of the
archives defining the `tasks` and used to build the standard `BeyondDFT` workflow. This section needs to be
inherit and the method references need to be defined for each specific case (see, e.g., dft_plus_tb.py module).
"""

pass


class BeyondDFT(SimulationWorkflow):
"""
A base section used to represent a beyond-DFT workflow and containing a `method` section which uses references
to the specific tasks `ModelMethod` sections.
"""

method = SubSection(
sub_section=BeyondDFTMethod.m_def,
description="""
Abstract sub section used to populate the `method` of a `BeyondDFT` workflow with references
to the corresponding `SinglePoint` entries and their `ModelMethod` sections.
""",
)

@check_n_tasks()
def resolve_all_outputs(self) -> list[Outputs]:
"""
Resolves all the `Outputs` sections from the `tasks` in the workflow. This is useful when
the workflow is composed of multiple tasks and the outputs need to be stored in a list
for further manipulation, e.g., to plot multiple band structures in a DFT+TB workflow.

Returns:
list[Outputs]: A list of all the `Outputs` sections from the `tasks`.
"""
# Populate the list of outputs from the last element in `tasks`
all_outputs = []
for task in self.tasks:
if not task.outputs:
continue
all_outputs.append(task.outputs[-1])
JosePizarro3 marked this conversation as resolved.
Show resolved Hide resolved
return all_outputs

@check_n_tasks()
def resolve_method_refs(
self, tasks: list[TaskReference], tasks_names: list[str]
) -> list[BaseModelMethod]:
"""
Resolve the references to the `BaseModelMethod` sections in the list of `tasks`. This is useful
when defining the `method` section of the `BeyondDFT` workflow.

Args:
tasks (list[TaskReference]): The list of tasks from which resolve the `BaseModelMethod` sections.
tasks_names (list[str]): The list of names for each of the tasks forming the BeyondDFT workflow.

Returns:
list[BaseModelMethod]: The list of resolved `BaseModelMethod` sections.
"""
# Initial check on the inputs
if len(tasks) != len(tasks_names):
return []

method_refs = []
for i, task in enumerate(tasks):
# Define names of the tasks
task.name = tasks_names[i]

# Check if task.inputs or task.outputs do not exists for any of the 2 tasks
if not task.m_xpath('task.inputs'):
continue

# Resolve the method of each task.inputs
for input in task.task.inputs:
if isinstance(input.section, BaseModelMethod):
method_refs.append(input.section)
break
return method_refs

def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None:
super().normalize(archive, logger)
157 changes: 157 additions & 0 deletions src/nomad_simulations/schema_packages/workflow/dft_plus_tb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from nomad.datamodel.datamodel import EntryArchive
from structlog.stdlib import BoundLogger

from nomad.datamodel.metainfo.workflow import Link, TaskReference
from nomad.metainfo import Quantity, Reference

from nomad_simulations.schema_packages.model_method import DFT, TB
from nomad_simulations.schema_packages.workflow import BeyondDFT, BeyondDFTMethod
from nomad_simulations.schema_packages.workflow.base_workflows import check_n_tasks

from .single_point import SinglePoint


class DFTPlusTBMethod(BeyondDFTMethod):
"""
Section used to reference the `DFT` and `TB` `ModelMethod` sections in each of the archives
conforming a DFT+TB simulation workflow.
"""

dft_method_ref = Quantity(
type=Reference(DFT),
description="""
Reference to the DFT `ModelMethod` section in the DFT task.
""",
)
tb_method_ref = Quantity(
type=Reference(TB),
description="""
Reference to the TB `ModelMethod` section in the TB task.
""",
)


class DFTPlusTB(BeyondDFT):
"""
A base section used to represent a DFT+TB calculation workflow. The `DFTPlusTB` workflow is composed of
two tasks: the initial DFT calculation + the final TB projection.

The section only needs to be populated with (everything else is handled by the `normalize` function):
i. The `tasks` as `TaskReference` sections, adding `task` to the specific archive.workflow2 sections.
ii. The `inputs` and `outputs` as `Link` sections pointing to the specific archives.

Note 1: the `inputs[0]` of the `DFTPlusTB` coincides with the `inputs[0]` of the DFT task (`ModelSystem` section).
Note 2: the `outputs[-1]` of the `DFTPlusTB` coincides with the `outputs[-1]` of the TB task (`Outputs` section).
Note 3: the `outputs[-1]` of the DFT task is used as `inputs[0]` of the TB task.

The archive.workflow2 section is:
- name = 'DFT+TB'
- method = DFTPlusTBMethod(
dft_method_ref=dft_archive.data.model_method[-1],
tb_method_ref=tb_archive.data.model_method[-1],
)
- inputs = [
Link(name='Input Model System', section=dft_archive.data.model_system[0]),
]
- outputs = [
Link(name='Output TB Data', section=tb_archive.data.outputs[-1]),
]
- tasks = [
TaskReference(
name='DFT SinglePoint Task',
task=dft_archive.workflow2
inputs=[
Link(name='Input Model System', section=dft_archive.data.model_system[0]),
],
outputs=[
Link(name='Output DFT Data', section=dft_archive.data.outputs[-1]),
]
),
TaskReference(
name='TB SinglePoint Task',
task=tb_archive.workflow2,
inputs=[
Link(name='Output DFT Data', section=dft_archive.data.outputs[-1]),
],
outputs=[
Link(name='Output tb Data', section=tb_archive.data.outputs[-1]),
]
),
]
"""

@check_n_tasks(n_tasks=2)
def link_task_inputs_outputs(
self, tasks: list[TaskReference], logger: 'BoundLogger'
Copy link
Collaborator

@ladinesa ladinesa Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought you already figured out how to handle loggers. since you are using decorators, maybe create a decorator for error handling as nathan suggested

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the loggers issue is after this one. I can change to the class __init__ ; I find it more convenient to use self.logger for all methods

) -> None:
if not self.inputs or not self.outputs:
logger.warning(
'The `DFTPlusTB` workflow needs to have `inputs` and `outputs` defined in order to link with the `tasks`.'
)
return None

dft_task = tasks[0]
tb_task = tasks[1]

# Initial check
if not dft_task.m_xpath('task.outputs'):
return None

# Input of DFT Task is the ModelSystem
dft_task.inputs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if i understood this right but it should be the other way around right? i.e you assign self.in(out)puts from dft and tb task inputs and outputs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our discussion, this is what was confusing me with the need of defining inputs/outputs in a TaskReference.

I think I can improve this in a new schema as specified in https://gitlab.mpcdf.mpg.de/nomad-lab/nomad-FAIR/-/merge_requests/2143 with some minor tweeks

Link(name='Input Model System', section=self.inputs[0]),
]
# Output of DFT Task is the output section of the DFT entry
dft_task.outputs = [
Link(name='Output DFT Data', section=dft_task.task.outputs[-1]),
]
# Input of TB Task is the output of the DFT task
tb_task.inputs = [
Link(name='Output DFT Data', section=dft_task.task.outputs[-1]),
]
# Output of TB Task is the output section of the TB entry
tb_task.outputs = [
Link(name='Output TB Data', section=self.outputs[-1]),
]

# TODO check if implementing overwritting the FermiLevel.value in the TB entry from the DFT entry

@check_n_tasks(n_tasks=2)
def normalize(self, archive: 'EntryArchive', logger: 'BoundLogger') -> None:
super().normalize(archive, logger)

# Check if `tasks` are not SinglePoints
for task in self.tasks:
if not task.task:
logger.error(
'A `DFTPlusTB` workflow must have two `SinglePoint` tasks references.'
)
return
if not isinstance(task.task, SinglePoint):
logger.error(
'The referenced tasks in the `DFTPlusTB` workflow must be of type `SinglePoint`.'
)
return

# Define name of the workflow
self.name = 'DFT+TB'

# Resolve `method`
method_refs = self.resolve_method_refs(
tasks=self.tasks,
tasks_names=['DFT SinglePoint Task', 'TB SinglePoint Task'],
)
if method_refs is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, method_refs should only length 2 right and arranged as DFT, TN so maybe loop not needed

method_workflow = DFTPlusTBMethod()
for method in method_refs:
if isinstance(method, DFT):
method_workflow.dft_method_ref = method
elif isinstance(method, TB):
method_workflow.tb_method_ref = method
self.method = method_workflow

# Resolve `tasks[*].inputs` and `tasks[*].outputs`
self.link_task_inputs_outputs(tasks=self.tasks, logger=logger)
Loading