diff --git a/examples/files/configs/example_config.yaml b/examples/files/configs/example_config.yaml deleted file mode 100644 index 87d4f91..0000000 --- a/examples/files/configs/example_config.yaml +++ /dev/null @@ -1,182 +0,0 @@ ---- -start_date: "2026-01-01T00:00" -end_date: "2027-01-01T00:00" -cycles: - - init: - tasks: - - extpar: - inputs: - - obs_data: - arg_option: "--input" - outputs: - - extpar_file - - icon_bimonthly: - period: "P2M" - tasks: - - preproc: - inputs: - - grid_file: - arg_option: "-g" - - extpar_file: - date: "2026-01-01T00:00" - arg_option: "-p" - - ERA5: - arg_option: "-e" - outputs: - - icon_input - depends: - - icon: - lag: "-P4M" - - icon: - inputs: - - grid_file: - arg_option: "-g" - - icon_input: - arg_option: "--input" - #- icon_restart: - # lag: '-P2M' - outputs: - - stream_1 - - stream_2 - #- icon_restart - - postproc_1: - inputs: - - stream_1: - arg_option: "--input" - outputs: - - postout_1 - - store_and_clean_1: - inputs: - - postout_1: - arg_option: "--input" - - stream_1: - arg_option: "--stream" - - icon_input: - arg_option: "--icon_input" - outputs: - - stored_data_1 - - yearly: - period: "P1Y" - tasks: - - postproc_2: - inputs: - - stream_2: - lag: - - "P0M" - arg_option: "--input" - outputs: - - postout_2 - - store_and_clean_2: - inputs: - - postout_2: - arg_option: "--input" - - stream_2: - lag: - - "P0M" - outputs: - - stored_data_2 -# Each task and piece of data (input and output of tasks) used to -# define the graph is described in that section -tasks: - - ROOT: - # All tasks inherit the root task properties - host: santis - account: g110 - - extpar: - plugin: extpar - command: $PWD/files/scripts/extpar - command_option: "--verbose" # todo implement support - config: path/to/namelists/dir - uenv: - squashfs: path/to/squashfs - mount_point: runtime/mount/point - nodes: 1 - walltime: "00:02:00" - - preproc: - plugin: AiiDA Shell - command: $PWD/files/scripts/cleanup.sh - nodes: 4 - walltime: "00:02:00" - config: path/to/config/dir - uenv: - squashfs: path/to/squashfs - mount_point: runtime/mount/point - - icon: - plugin: icon - command: $PWD/files/scripts/icon - nodes: 40 - walltime: "23:59:59" - config: path/to/namelists/dir - uenv: - squashfs: path/to/squashfs - mount_point: runtime/mount/point - - postproc_1: - plugin: AiiDA Shell - command: $PWD/files/scripts/main_script_ocn.sh - nodes: 2 - walltime: "00:05:00" - conda_env: path/to/yaml/env/file - uenv: - squashfs: path/to/squashfs - mount_point: runtime/mount/point - - postproc_2: - plugin: AiiDA Shell - command: $PWD/files/scripts/main_script_atm.sh - nodes: 2 - walltime: "00:05:00" - src: path/to/src/dir - conda_env: path/to/yaml/env/file - uenv: - squashfs: path/to/squashfs - mount_point: runtime/mount/point - - store_and_clean_1: - plugin: AiiDA Shell - command: $PWD/files/scripts/post_clean.sh - nodes: 1 - walltime: "00:01:00" - - store_and_clean_2: - plugin: AiiDA Shell - command: $PWD/files/scripts/post_clean.sh - nodes: 1 - walltime: "00:01:00" -data: - - preproc_output: - type: file - src: $PWD/files/data/file.ncdf - - grid_file: - type: file - src: $PWD/files/data/grid - - obs_data: - type: file - src: $PWD/files/data/obs_data - - ERA5: - type: file - src: $PWD/files/data/era5 - - extpar_file: - type: file - src: output - - icon_input: - type: file - src: output - - icon_restart: - type: file - format: ncdf - src: restart - - stream_1: - type: file - src: output_1 - - stream_2: - type: file - src: output_2 - - postout_1: - type: file - src: postout - - postout_2: - type: file - src: postout - - stored_data_1: - type: file - src: stored_data - - stored_data_2: - type: file - src: stored_data diff --git a/examples/files/data/era5 b/examples/files/data/era5 deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/data/file.ncdf b/examples/files/data/file.ncdf deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/data/grid b/examples/files/data/grid deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/data/input b/examples/files/data/input deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/data/obs_data b/examples/files/data/obs_data deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/scripts/argparse b/examples/files/scripts/argparse deleted file mode 100644 index e69de29..0000000 diff --git a/examples/files/scripts/cleanup.py b/examples/files/scripts/cleanup.py deleted file mode 100755 index ff9c30e..0000000 --- a/examples/files/scripts/cleanup.py +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python - -def main(): - # Main script execution continues here - print("Cleaning") - - -if __name__ == '__main__': - main() diff --git a/examples/files/scripts/cleanup.sh b/examples/files/scripts/cleanup.sh deleted file mode 100755 index bc5435d..0000000 --- a/examples/files/scripts/cleanup.sh +++ /dev/null @@ -1 +0,0 @@ -echo "cleanup" > output diff --git a/examples/files/scripts/extpar b/examples/files/scripts/extpar deleted file mode 100755 index 66dc5ac..0000000 --- a/examples/files/scripts/extpar +++ /dev/null @@ -1 +0,0 @@ -echo "extpar" > output diff --git a/examples/files/scripts/icon b/examples/files/scripts/icon deleted file mode 100755 index ec409b0..0000000 --- a/examples/files/scripts/icon +++ /dev/null @@ -1,4 +0,0 @@ -echo "icon" > restart -echo "icon" > output -echo "icon" > output_1 -echo "icon" > output_2 diff --git a/examples/files/scripts/icon.py b/examples/files/scripts/icon.py deleted file mode 100755 index f5ec27b..0000000 --- a/examples/files/scripts/icon.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python - -import argparse -from pathlib import Path - -def main(): - parser = argparse.ArgumentParser(description='A simple script with an optional restart argument.') - parser.add_argument('icon_input', type=str, help='The icon input.') - parser.add_argument('--restart', nargs='?', type=str, help='The icon restart file.') - #parser.add_argument('--restart', nargs='?', const='default', type=str, help='Initiate a restart operation with an optional string argument.') - - - args = parser.parse_args() - - output = Path('output') - output.write_text("") - - if args.restart: - restart = Path(args.restart) - restart.read_text() - text = "Restart operation initiated..." - print(text) - with output.open("a") as f: - f.write(text) - else: - text = "No restart option provided. Continuing without restart." - print(text) - with output.open("a") as f: - f.write(text) - - # Main script execution continues here - text = "Script execution continues..." - print(text) - with output.open("a") as f: - f.write(text) - - restart = Path('restart') - restart.write_text("") - -if __name__ == '__main__': - main() diff --git a/examples/files/scripts/icon.sh b/examples/files/scripts/icon.sh deleted file mode 100644 index dde2cf8..0000000 --- a/examples/files/scripts/icon.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/bin/bash - -# Function to write text to the output file -write_to_output() { - local text="$1" - echo "$text" - echo "$text" >> output -} - -# Check if at least one argument is provided -if [ "$#" -lt 1 ]; then - echo "Usage: $0 icon_input [--restart [restart_file]]" - exit 1 -fi - -# Positional argument -icon_input="$1" - -# Optional restart argument -restart_file="" - -if [ "$2" == "--restart" ]; then - if [ -n "$3" ]; then - restart_file="$3" - fi -fi - -# Create/empty the output file -> output - -# Handling restart if the argument is provided -if [ -n "$restart_file" ]; then - if [ -f "$restart_file" ]; then - cat "$restart_file" > /dev/null - write_to_output "Restart operation initiated..." - else - echo "Restart file $restart_file does not exist." - exit 1 - fi -else - write_to_output "No restart option provided. Continuing without restart." -fi - -# Main script execution continues here -write_to_output "Script execution continues..." - -# Create/empty the restart file -> restart diff --git a/examples/files/scripts/main_script_atm.sh b/examples/files/scripts/main_script_atm.sh deleted file mode 100755 index 2a361f8..0000000 --- a/examples/files/scripts/main_script_atm.sh +++ /dev/null @@ -1 +0,0 @@ -echo "main_script_atm.sh" > postout diff --git a/examples/files/scripts/main_script_ocn.sh b/examples/files/scripts/main_script_ocn.sh deleted file mode 100755 index 1d01b24..0000000 --- a/examples/files/scripts/main_script_ocn.sh +++ /dev/null @@ -1 +0,0 @@ -echo "python main_script_ocn.sh" > postout diff --git a/examples/files/scripts/post_clean.sh b/examples/files/scripts/post_clean.sh deleted file mode 100755 index b91319f..0000000 --- a/examples/files/scripts/post_clean.sh +++ /dev/null @@ -1 +0,0 @@ -echo "store_and_clean" > stored_data diff --git a/src/wcflow/core.py b/src/wcflow/core.py deleted file mode 100644 index 0458a99..0000000 --- a/src/wcflow/core.py +++ /dev/null @@ -1,676 +0,0 @@ -from __future__ import annotations - -from datetime import datetime -from itertools import chain -from os.path import expandvars -from pathlib import Path -from typing import TYPE_CHECKING - -from isoduration import parse_duration -from isoduration.types import Duration - -from wcflow.parsing._utils import TimeUtils - -if TYPE_CHECKING: - from collections.abc import Generator - - -class _DataBase: - def __init__( - self, - name: str, - type: str, # noqa: A002 - src: str, - lag: list[Duration], - date: list[datetime], - arg_option: str | None, - ): - self._name = name - - self._src = src - self._path = Path(expandvars(self._src)) - - self._type = type - if self._type not in ["file", "dir"]: - msg = f"Data type {self._type!r} not supported. Please use 'file' or 'dir'." - raise ValueError(msg) - - if len(lag) > 0 and len(date) > 0: - msg = "Either 'lag' or 'date' can be nonempty. Not both." - raise ValueError(msg) - - # COMMENT I think we should just disallow multiple lags, and enforce the user to write multiple lags - # I am not sure how this work with icon as it does not need positional arguments - # or rather how does it work with plugins - if arg_option is not None and (len(lag) > 1 or len(date) > 1): - msg = ( - "You cannot give an arg_option when multiple lags and dates are given. " - "They must be positional arguments, since passing them to one option is ambiguous." - ) - raise ValueError(msg) - - self._lag = lag - self._date = date - self._arg_option = arg_option - - @property - def name(self) -> str: - """The name of this data instance that is used as identifier.""" - return self._name - - @property - def type(self) -> str: - """The data type.""" - return self._type - - @property - def src(self) -> str: - return self._src - - @property - def path(self) -> Path: - return self._path - - @property - def lag(self) -> list[Duration]: - return self._lag - - @property - def date(self) -> list[datetime]: - return self._date - - @property - def arg_option(self) -> str | None: - return self._arg_option - - -class Data(_DataBase): - def __init__( - self, - name: str, - type: str, # noqa: A002 - src: str, - lag: list[Duration], - date: list[datetime], - arg_option: str | None, - ): - super().__init__(name, type, src, lag, date, arg_option) - self._task: Task | None = None - - def unroll(self, unrolled_task: UnrolledTask) -> Generator[UnrolledData, None, None]: - if len(self._date) == 0 and len(self._lag) == 0: - yield UnrolledData.from_data(self, unrolled_task, unrolled_task.unrolled_date) - - for lag in self._lag: - lagged_date = unrolled_task.unrolled_date + lag - if ( - lagged_date >= unrolled_task.unrolled_cycle.start_date - and lagged_date <= unrolled_task.unrolled_cycle.end_date - ): - yield UnrolledData.from_data(self, unrolled_task, lagged_date) - - for date in self._date: - yield UnrolledData.from_data(self, unrolled_task, date) - - def __repr__(self) -> str: - if self.task is None: - identifier = f"{self.__class__.__name__} '{self.name}'" - else: - identifier = f"{self.__class__.__name__} '{self.name}' attached task '{self.task}'" - return super().__repr__().replace(f"{self.__class__.__name__}", identifier) - - @property - def task(self) -> Task | None: - return self._task - - @task.setter - def task(self, task: Task): - if self._task is not None: - msg = f"Data {self} was already assigned to task {self._task}. Cannot assign task to task {task}." - raise ValueError(msg) - self._task = task - - -class UnrolledData(_DataBase): - """ - Data that are created during the unrolling of a cycle. - This class should be only initiated through unrolling a cycle. - """ - - @classmethod - def from_data(cls, data: Data, unrolled_task: UnrolledTask, unrolled_date: datetime): - return cls(unrolled_task, unrolled_date, data.name, data.type, data.src, data.lag, data.date, data.arg_option) - - def __init__( - self, - unrolled_task: UnrolledTask, - unrolled_date: datetime, - name: str, - type: str, # noqa: A002 - src: str, - lag: list[Duration], - date: list[datetime], - arg_option: str | None, - ): - super().__init__(name, type, src, lag, date, arg_option) - self._unrolled_task = unrolled_task - self._unrolled_date = unrolled_date - - def __repr__(self) -> str: - if self.unrolled_task is None: - identifier = f"{self.__class__.__name__} '{self.name}' with date {self.unrolled_date}" - else: - identifier = f"{self.__class__.__name__} '{self.name}' with date {self.unrolled_date} attached to task {self.unrolled_task}" - return super().__repr__().replace(f"{self.__class__.__name__}", identifier) - - @property - def unrolled_date(self) -> datetime: - return self._unrolled_date - - @property - def unrolled_task(self) -> UnrolledTask: - return self._unrolled_task - - -class _DependencyBase: - def __init__(self, depend_on_task_name: str, lag: list[Duration], date: list[datetime], cycle_name: str | None): - self._depend_on_task_name = depend_on_task_name - if len(lag) > 0 and len(date) > 0: - msg = "Only one key 'lag' or 'date' is allowed. Not both." - raise ValueError(msg) - - self._lag = lag - self._date = date - self._cycle_name = cycle_name - - @property - def depend_on_task_name(self) -> str: - return self._depend_on_task_name - - @property - def lag(self) -> list[Duration]: - return self._lag - - @property - def date(self) -> list[datetime]: - return self._date - - @property - def cycle_name(self) -> str | None: - return self._cycle_name - - -class Dependency(_DependencyBase): - def __init__(self, depend_on_task_name: str, lag: list[Duration], date: list[datetime], cycle_name: str | None): - super().__init__(depend_on_task_name, lag, date, cycle_name) - self._task: Task | None = None - - def unroll(self, unrolled_task: UnrolledTask) -> Generator[UnrolledDependency, None, None]: - if len(self._date) == 0 and len(self._lag) == 0: - yield UnrolledDependency.from_dependency(self, unrolled_task, unrolled_task.unrolled_date) - - for lag in self._lag: - lagged_date = unrolled_task.unrolled_date + lag - if ( - lagged_date >= unrolled_task.unrolled_cycle.start_date - and lagged_date <= unrolled_task.unrolled_cycle.end_date - ): - yield UnrolledDependency.from_dependency(self, unrolled_task, lagged_date) - - for date in self._date: - yield UnrolledDependency.from_dependency(self, unrolled_task, date) - - @property - def task(self) -> Task | None: - return self._task - - @task.setter - def task(self, task: Task): - if self.task is not None: - msg = f"Dependency was already assigned to task {self.task}. Cannot assign to task {task}." - raise ValueError(msg) - self._task = task - - def __repr__(self) -> str: - if self._cycle_name is None: - identifier = f"{self.__class__.__name__} on task '{self.depend_on_task_name}' attached to task {self.task}" - else: - identifier = f"{self.__class__.__name__} on task '{self.depend_on_task_name}' in cycle '{self.cycle_name}' attached to task {self.task}" - return super().__repr__().replace(f"{self.__class__.__name__}", identifier) - - -class UnrolledDependency(_DependencyBase): - """ - This class should be only initiated through unrolling a cycle. - """ - - @classmethod - def from_dependency(cls, depend: Dependency, unrolled_task: UnrolledTask, unrolled_date: datetime): - return cls(unrolled_task, unrolled_date, depend.depend_on_task_name, depend.lag, depend.date, depend.cycle_name) - - def __init__( - self, - unrolled_task: UnrolledTask, - unrolled_date: datetime, - depend_on_task_name: str, - lag: list[Duration], - date: list[datetime], - cycle_name: str | None, - ): - super().__init__(depend_on_task_name, lag, date, cycle_name) - self._unrolled_task = unrolled_task - self._unrolled_date = unrolled_date - - def __repr__(self) -> str: - if self._cycle_name is None: - identifier = ( - f"{self.__class__.__name__} on task '{self.depend_on_task_name}' with date {self.unrolled_date}" - ) - else: - identifier = f"{self.__class__.__name__} on task '{self.depend_on_task_name}' in cycle '{self.cycle_name}' with date {self.unrolled_date}" - return super().__repr__().replace(f"{self.__class__.__name__}", identifier) - - @property - def depend_on_task(self) -> UnrolledTask: - """ - throws error if not found - """ - # for now we only support looking in the same cycle - workflow = self._unrolled_task.unrolled_cycle.workflow - if self._cycle_name is None: - tasks_to_search = [ - cycle.unrolled_tasks for cycle in workflow.unrolled_cycles if cycle.unrolled_date == self._unrolled_date - ] - potential_tasks = [ - task for task in chain.from_iterable(tasks_to_search) if task.name == self._depend_on_task_name - ] - if len(potential_tasks) > 1: - msg = ( - f"Found multiple instances of the task '{self._depend_on_task_name}' with date {self._unrolled_date}" - " for dependency of the task {self._unrolled_task}. Please specify a cycle name." - ) - raise ValueError(msg) - if len(potential_tasks) == 0: - msg = ( - f"Found no instance of the task '{self._depend_on_task_name}' with date {self._unrolled_date}" - f" for dependency attached to task {self._unrolled_task}." - ) - raise ValueError(msg) - return potential_tasks[0] - - cycle = workflow.unrolled_cycles_map[(self._cycle_name, self._unrolled_date)] - return cycle.unrolled_tasks_map[self._depend_on_task_name] - - @property - def unrolled_task(self) -> UnrolledTask: - return self._unrolled_task - - @property - def unrolled_date(self) -> datetime: - return self._unrolled_date - - -class _TaskBase: - """ - Common class for Task and UnrolledTask to reduce code duplications - """ - - def __init__( - self, - name: str, - command: str, - inputs: list[Data], - outputs: list[Data], - depends: list[Dependency], - command_option: str | None, - ): - self._name = name - self._command = expandvars(command) - self._inputs = inputs - self._outputs = outputs - self._depends = depends - self._command_option = command_option - - @property - def name(self) -> str: - return self._name - - @property - def command(self) -> str: - return self._command - - @property - def inputs(self) -> list[Data]: - return self._inputs - - @property - def outputs(self) -> list[Data]: - return self._outputs - - @property - def command_option(self) -> str | None: - return self._command_option - - @property - def depends(self) -> list[Dependency]: - return self._depends - - -class Task(_TaskBase): - """A task that is created during the unrolling of a cycle.""" - - def __init__( - self, - name: str, - command: str, - inputs: list[Data], - outputs: list[Data], - depends: list[Dependency], - command_option: str | None, - ): - super().__init__(name, command, inputs, outputs, depends, command_option) - for input_ in inputs: - input_.task = self - for output in outputs: - output.task = self - for depend in depends: - depend.task = self - self._cycle: Cycle | None = None - - def __repr__(self) -> str: - identifier = f"Task '{self.name}'" - if self.cycle is not None: - identifier += f" in cycle {self.cycle.name}" - return super().__repr__().replace("Task", identifier) - - def unroll(self, unrolled_cycle: UnrolledCycle) -> Generator[tuple[str, UnrolledTask], None, None]: - # an unrolled task is just one task, since the date is determined - # by the cycle, but we keep the pattern for consistency - unrolled_task = UnrolledTask.from_task(self, unrolled_cycle) - yield unrolled_task.name, unrolled_task - - @property - def cycle(self) -> Cycle | None: - return self._cycle - - @cycle.setter - def cycle(self, cycle: Cycle): - if self._cycle is not None: - msg = f"Task {self} was already assigned to cycle {self._cycle}. Cannot assign task to cycle {cycle}." - raise ValueError(msg) - self._cycle = cycle - - -class UnrolledTask(_TaskBase): - """ - This class should be only initiated through unrolling a cycle. - """ - - @classmethod - def from_task(cls, task: Task, unrolled_cycle: UnrolledCycle): - return cls( - unrolled_cycle, task.name, task.command, task.inputs, task.outputs, task.depends, task.command_option - ) - - def __init__( - self, - unrolled_cycle: UnrolledCycle, - name: str, - command: str, - inputs: list[Data], - outputs: list[Data], - depends: list[Dependency], - command_option: str | None, - ): - super().__init__(name, command, inputs, outputs, depends, command_option) - self._unrolled_cycle = unrolled_cycle - self._unrolled_inputs = list(self.unroll_inputs()) - self._unrolled_outputs = list(self.unroll_outputs()) - self._unrolled_depends = list(self.unroll_depends()) - - def __repr__(self) -> str: - if self.unrolled_cycle is None: - identifier = f"Task '{self.name}' with date {self.unrolled_date}" - else: - identifier = f"Task '{self.name}' in cycle {self.unrolled_cycle.name} with date {self.unrolled_date}" - return super().__repr__().replace("Task", identifier) - - def unroll_inputs(self) -> Generator[UnrolledData, None, None]: - """ - Outputs the inputs together with a unique identifier within the task - """ - for input_ in self._inputs: - yield from input_.unroll(self) - - def unroll_outputs(self) -> Generator[UnrolledData, None, None]: - for output in self._outputs: - yield from output.unroll(self) - - def unroll_depends(self) -> Generator[UnrolledDependency, None, None]: - for depend in self._depends: - yield from depend.unroll(self) - - @property - def unrolled_inputs(self) -> list[UnrolledData]: - return self._unrolled_inputs - - @property - def unrolled_outputs(self) -> list[UnrolledData]: - return self._unrolled_outputs - - @property - def unrolled_depends(self) -> list[UnrolledDependency]: - return self._unrolled_depends - - @property - def unrolled_date(self) -> datetime: - return self._unrolled_cycle.unrolled_date - - @property - def unrolled_cycle(self) -> UnrolledCycle: - return self._unrolled_cycle - - -class _CycleBase: - def __init__( - self, - name: str, - tasks: list[Task], - start_date: str | datetime, - end_date: str | datetime, - period: str | Duration | None = None, - ): - self._name = name - self._tasks = tasks - self._start_date = start_date if isinstance(start_date, datetime) else datetime.fromisoformat(start_date) - self._end_date = end_date if isinstance(end_date, datetime) else datetime.fromisoformat(end_date) - - if self._start_date > self._end_date: - msg = "For cycle {self} the start_date {start_date} lies after given end_date {end_date}." - raise ValueError(msg) - - self._period = period if period is None or isinstance(period, Duration) else parse_duration(period) - if self._period is not None and TimeUtils.duration_is_less_equal_zero(self._period): - msg = f"For cycle {self} the period {period} is negative or zero." - raise ValueError(msg) - - task_names = set() - for task in self._tasks: - if task.name in task_names: - msg = f"List of tasks does contain tasks with duplicate names. The task name '{task.name}' has been found twice." - raise ValueError(msg) - task_names.add(task.name) - - @property - def name(self) -> str: - return self._name - - @property - def start_date(self) -> datetime: - return self._start_date - - @property - def end_date(self) -> datetime: - return self._end_date - - @property - def period(self) -> Duration | None: - return self._period - - @property - def tasks(self) -> list[Task]: - return self._tasks - - -class Cycle(_CycleBase): - def __init__( - self, - name: str, - tasks: list[Task], - start_date: str | datetime, - end_date: str | datetime, - period: str | Duration | None, - ): - super().__init__(name, tasks, start_date, end_date, period) - for task in self._tasks: - task.cycle = self - - self._workflow: Workflow | None = None - - def __repr__(self) -> str: - if self.workflow is None: - identifier = f"Cycle '{self.name}'" - else: - identifier = f"Cycle '{self.name}' in workflow {self.workflow.name}" - return super().__repr__().replace("Cycle", identifier) - - def unroll(self) -> Generator[tuple[str, datetime, UnrolledCycle], None, None]: - if self._workflow is None: - msg = f"Cannot unroll cycle {self} because it was not attached to a workflow before." - raise ValueError(msg) - current_date = self._start_date - while current_date <= self._end_date: - unrolled_cycle = UnrolledCycle.from_cycle(self, current_date, self._workflow) - yield unrolled_cycle.name, unrolled_cycle.unrolled_date, unrolled_cycle - if self._period is None: - break - else: - current_date += self._period - - @property - def workflow(self) -> Workflow | None: - return self._workflow - - @workflow.setter - def workflow(self, workflow: Workflow): - if self._workflow is not None: - msg = f"Cycle {self} was already assigned to workflow {self._workflow}. Cannot assign cycle to workflow {workflow}." - raise ValueError(msg) - self._workflow = workflow - - -class UnrolledCycle(_CycleBase): - """ - This class should be only initiated through unrolling a cycle. - """ - - @classmethod - def from_cycle(cls, cycle: Cycle, unrolled_date: datetime, workflow: Workflow): - return cls(unrolled_date, cycle.name, cycle.tasks, cycle.start_date, cycle.end_date, cycle.period, workflow) - - def __init__( - self, - unrolled_date: datetime, - name: str, - tasks: list[Task], - start_date: str | datetime, - end_date: str | datetime, - period: str | Duration | None, - workflow: Workflow, - ): - super().__init__(name, tasks, start_date, end_date, period) - - self._unrolled_date = unrolled_date - - self._unrolled_tasks_map = dict(self.unroll_tasks()) - self._workflow = workflow - - def __repr__(self) -> str: - if self.workflow is None: - identifier = f"UnrolledCycle '{self.name}' with date {self.unrolled_date}" - else: - identifier = f"UnrolledCycle '{self.name}' in workflow {self.workflow.name} with date {self.unrolled_date}" - return super().__repr__().replace("UnrolledCycle", identifier) - - def unroll_tasks(self) -> Generator[tuple[str, UnrolledTask], None, None]: - for task in self._tasks: - yield from task.unroll(self) - - @property - def unrolled_tasks(self) -> list[UnrolledTask]: - return list(self._unrolled_tasks_map.values()) - - @property - def unrolled_tasks_map(self) -> dict[str, UnrolledTask]: - return self._unrolled_tasks_map - - @property - def unrolled_date(self) -> datetime: - return self._unrolled_date - - @property - def workflow(self) -> Workflow: - return self._workflow - - -class Workflow: - def __init__(self, name: str, cycles: list[Cycle]): - self._name = name - self._cycles = cycles - for cycle in self._cycles: - cycle.workflow = self - self._validate_cycles() - self._unrolled_cycles_map = {(name, date): cycle for name, date, cycle in self.unroll_cycles()} - - unrolled_outputs = [] - for unrolled_cycle in self.unrolled_cycles: - for unrolled_task in unrolled_cycle.unrolled_tasks: - unrolled_outputs.extend(unrolled_task.unrolled_outputs) - self._unrolled_outputs = unrolled_outputs - - def _validate_cycles(self): - """Checks if the defined workflow is correctly referencing key names.""" - cycle_names = set() - for cycle in self._cycles: - if cycle.name in cycle_names: - msg = f"List of cycles does contain cycles with duplicate names. The cycle name '{cycle.name}' has been found twice." - raise ValueError(msg) - cycle_names.add(cycle.name) - - def unroll_cycles(self) -> Generator[tuple[str, datetime, UnrolledCycle], None, None]: - for cycle in self._cycles: - yield from cycle.unroll() - - @property - def name(self) -> str: - return self._name - - @property - def cycles(self) -> list[Cycle]: - return self._cycles - - def is_available_on_init(self, data: UnrolledData) -> bool: - """Determines if the data is available on init of the workflow.""" - - def equal_check(output: UnrolledData) -> bool: - return output.name == data.name and output.unrolled_date == data.unrolled_date - - return len(list(filter(equal_check, self._unrolled_outputs))) == 0 - - @property - def unrolled_cycles(self) -> list[UnrolledCycle]: - return list(self._unrolled_cycles_map.values()) - - @property - def unrolled_cycles_map(self) -> dict[tuple[str, datetime], UnrolledCycle]: - return self._unrolled_cycles_map diff --git a/src/wcflow/workgraph.py b/src/wcflow/workgraph.py deleted file mode 100644 index be0bd06..0000000 --- a/src/wcflow/workgraph.py +++ /dev/null @@ -1,238 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any - -import aiida.common -import aiida.orm -import aiida_workgraph.engine.utils # type: ignore[import-untyped] -from aiida_workgraph import WorkGraph # type: ignore[import-untyped] - -if TYPE_CHECKING: - from aiida_workgraph.socket import TaskSocket # type: ignore[import-untyped] - - from wcflow import core - - -# This is hack to aiida-workgraph, merging this into aiida-workgraph properly would require -# some major refactor see issue https://github.com/aiidateam/aiida-workgraph/issues/168 -# It might be better to give up on the graph like construction and just create the task -# directly with inputs, arguments and outputs -def _prepare_for_shell_task(task: dict, kwargs: dict) -> dict: - """Prepare the inputs for ShellTask""" - from aiida.common import lang - from aiida.orm import AbstractCode - from aiida_shell.launch import convert_nodes_single_file_data, prepare_code - - command = kwargs.pop("command", None) - resolve_command = kwargs.pop("resolve_command", False) - metadata = kwargs.pop("metadata", {}) - # setup code - if isinstance(command, str): - computer = (metadata or {}).get("options", {}).pop("computer", None) - code = prepare_code(command, computer, resolve_command) - else: - lang.type_check(command, AbstractCode) - code = command - # update the tasks with links - nodes = convert_nodes_single_file_data(kwargs.pop("nodes", {})) - # find all keys in kwargs start with "nodes." - for key in list(kwargs.keys()): - if key.startswith("nodes."): - nodes[key[6:]] = kwargs.pop(key) - metadata.update({"call_link_label": task["name"]}) - - default_outputs = {"remote_folder", "remote_stash", "retrieved", "_outputs", "_wait", "stdout", "stderr"} - task_outputs = {task["outputs"][i]["name"] for i in range(len(task["outputs"]))} - task_outputs = task_outputs.union(set(kwargs.pop("outputs", []))) - missing_outputs = task_outputs.difference(default_outputs) - return { - "code": code, - "nodes": nodes, - "filenames": kwargs.pop("filenames", {}), - "arguments": kwargs.pop("arguments", []), - "outputs": list(missing_outputs), - "parser": kwargs.pop("parser", None), - "metadata": metadata or {}, - } - - -aiida_workgraph.engine.utils.prepare_for_shell_task = _prepare_for_shell_task - - -class AiidaWorkGraph: - def __init__(self, core_workflow: core.Workflow): - # Needed for date indexing - self._core_workflow = core_workflow - - self._validate_workflow() - - self._workgraph = WorkGraph(core_workflow.name) - - # stores the input data available on initialization - self._aiida_data_nodes: dict[str, aiida_workgraph.orm.Data] = {} - # stores the outputs sockets of tasks - self._aiida_socket_nodes: dict[str, TaskSocket] = {} - self._aiida_task_nodes: dict[str, aiida_workgraph.Task] = {} - - self._add_aiida_initial_data_nodes() - self._add_aiida_task_nodes() - self._add_aiida_links() - - def _validate_workflow(self): - """Checks if the defined workflow is correctly referencing key names.""" - for cycle in self._core_workflow.unrolled_cycles: - try: - aiida.common.validate_link_label(cycle.name) - except ValueError as exception: - msg = f"Raised error when validating cycle name '{cycle.name}': {exception.args[0]}" - raise ValueError(msg) from exception - for task in cycle.unrolled_tasks: - try: - aiida.common.validate_link_label(task.name) - except ValueError as exception: - msg = f"Raised error when validating task name '{cycle.name}': {exception.args[0]}" - raise ValueError(msg) from exception - for input_ in task.unrolled_inputs: - try: - aiida.common.validate_link_label(task.name) - except ValueError as exception: - msg = f"Raised error when validating input name '{input_.name}': {exception.args[0]}" - raise ValueError(msg) from exception - for output in task.unrolled_outputs: - try: - aiida.common.validate_link_label(task.name) - except ValueError as exception: - msg = f"Raised error when validating output name '{output.name}': {exception.args[0]}" - raise ValueError(msg) from exception - # - Warning if output nodes that are overwritten by other tasks before usage, if this actually happens? - - def _add_aiida_initial_data_nodes(self): - """ - Nodes that correspond to data that are available on initialization of the workflow - """ - for cycle in self._core_workflow.unrolled_cycles: - for task in cycle.unrolled_tasks: - for input_ in task.unrolled_inputs: - if self._core_workflow.is_available_on_init(input_): - self._add_aiida_input_data_node(input_) - - @staticmethod - def parse_to_aiida_label(label: str) -> str: - return label.replace("-", "_").replace(" ", "_").replace(":", "_") - - @staticmethod - def get_aiida_label_from_unrolled_data(data: core.UnrolledData) -> str: - """ """ - return AiidaWorkGraph.parse_to_aiida_label(f"{data.name}_{data.unrolled_date}") - - @staticmethod - def get_aiida_label_from_unrolled_task(task: core.UnrolledTask) -> str: - """ """ - return AiidaWorkGraph.parse_to_aiida_label( - f"{task.unrolled_cycle.name}_" f"{task.unrolled_cycle.unrolled_date}_" f"{task.name}" - ) - - def _add_aiida_input_data_node(self, input_: core.UnrolledData): - """ - Create an :class:`aiida.orm.Data` instance from this wc data instance. - - :param input: ... - """ - label = AiidaWorkGraph.get_aiida_label_from_unrolled_data(input_) - if input_.type == "file": - self._aiida_data_nodes[label] = aiida.orm.SinglefileData(label=label, file=input_.path.resolve()) - elif input_.type == "dir": - self._aiida_data_nodes[label] = aiida.orm.FolderData(label=label, tree=input_.path.resolve()) - else: - msg = f"Data type {input_.type!r} not supported. Please use 'file' or 'dir'." - raise ValueError(msg) - - def _add_aiida_task_nodes(self): - for cycle in self._core_workflow.unrolled_cycles: - for task in cycle.unrolled_tasks: - self._add_aiida_task_node(task) - # after creation we can link the wait_on tasks - for cycle in self._core_workflow.unrolled_cycles: - for task in cycle.unrolled_tasks: - self._link_wait_on_to_task(task) - - def _add_aiida_task_node(self, task: core.UnrolledTask): - label = AiidaWorkGraph.get_aiida_label_from_unrolled_task(task) - workgraph_task = self._workgraph.tasks.new( - "ShellJob", - name=label, - command=task.command, - ) - workgraph_task.set({"arguments": []}) - workgraph_task.set({"nodes": {}}) - self._aiida_task_nodes[label] = workgraph_task - - def _link_wait_on_to_task(self, task: core.UnrolledTask): - label = AiidaWorkGraph.get_aiida_label_from_unrolled_task(task) - workgraph_task = self._aiida_task_nodes[label] - wait_on_tasks = [] - for depend in task.unrolled_depends: - wait_on_task_label = AiidaWorkGraph.get_aiida_label_from_unrolled_task(depend.depend_on_task) - wait_on_tasks.append(self._aiida_task_nodes[wait_on_task_label]) - workgraph_task.wait = wait_on_tasks - - def _add_aiida_links(self): - for cycle in self._core_workflow.unrolled_cycles: - self._add_aiida_links_from_cycle(cycle) - - def _add_aiida_links_from_cycle(self, cycle: core.UnrolledCycle): - for task in cycle.unrolled_tasks: - for input_ in task.unrolled_inputs: - self._link_input_to_task(input_) - for output in task.unrolled_outputs: - self._link_output_to_task(output) - - def _link_input_to_task(self, input_: core.UnrolledData): - task_label = AiidaWorkGraph.get_aiida_label_from_unrolled_task(input_.unrolled_task) - input_label = AiidaWorkGraph.get_aiida_label_from_unrolled_data(input_) - workgraph_task = self._aiida_task_nodes[task_label] - workgraph_task.inputs.new("Any", f"nodes.{input_label}") - workgraph_task.kwargs.append(f"nodes.{input_label}") - - # resolve data - if (data_node := self._aiida_data_nodes.get(input_label)) is not None: - if (nodes := workgraph_task.inputs.get("nodes")) is None: - msg = f"Workgraph task {workgraph_task.name!r} did not initialize input nodes in the workgraph before linking. This is a bug in the code, please contact the developers by making an issue." - raise ValueError(msg) - nodes.value.update({f"{input_label}": data_node}) - elif (output_socket := self._aiida_socket_nodes.get(input_label)) is not None: - self._workgraph.links.new(output_socket, workgraph_task.inputs[f"nodes.{input_label}"]) - else: - msg = f"Input data node {input_label!r} was neither found in socket nodes nor in data nodes. The task {task_label!r} must have dependencies on inputs before they are created." - raise ValueError(msg) - - # resolve arg_option - if (workgraph_task_arguments := workgraph_task.inputs.get("arguments")) is None: - msg = f"Workgraph task {workgraph_task.name!r} did not initialize arguments nodes in the workgraph before linking. This is a bug in the code, please contact devevlopers." - raise ValueError(msg) - if input_.arg_option is not None: - workgraph_task_arguments.value.append(f"{input_.arg_option}") - workgraph_task_arguments.value.append(f"{{{input_label}}}") - - def _link_output_to_task(self, output: core.UnrolledData): - workgraph_task = self._aiida_task_nodes[AiidaWorkGraph.get_aiida_label_from_unrolled_task(output.unrolled_task)] - output_label = AiidaWorkGraph.get_aiida_label_from_unrolled_data(output) - output_socket = workgraph_task.outputs.new("Any", output.src) - self._aiida_socket_nodes[output_label] = output_socket - - def run( - self, - inputs: None | dict[str, Any] = None, - metadata: None | dict[str, Any] = None, - ) -> dict[str, Any]: - return self._workgraph.run(inputs=inputs, metadata=metadata) - - def submit( - self, - *, - inputs: None | dict[str, Any] = None, - wait: bool = False, - timeout: int = 60, - metadata: None | dict[str, Any] = None, - ) -> dict[str, Any]: - return self._workgraph.submit(inputs=inputs, wait=wait, timeout=timeout, metadata=metadata)