Skip to content

Commit

Permalink
Introduce keywords that distinguish between available and generated d…
Browse files Browse the repository at this point in the history
…ata in the yml file (#29)

We introduce the keywords `available` and `generated` in the yml file that
specify if the data exists already on initialization or is generated by the
workflow. Before that we needed to infer this from how the data is used in the
workflow. We decided to let the user specify this for better error control
handling. Implements suggestion from issue #28
  • Loading branch information
leclairm authored Oct 29, 2024
1 parent 5e82e7b commit 5f942f8
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 68 deletions.
35 changes: 24 additions & 11 deletions src/wcflow/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def __init__(
lag: list[Duration],
date: list[datetime],
arg_option: str | None,
*,
available: bool,
):
self._name = name

Expand Down Expand Up @@ -52,6 +54,7 @@ def __init__(
self._lag = lag
self._date = date
self._arg_option = arg_option
self._available = available

@property
def name(self) -> str:
Expand Down Expand Up @@ -83,6 +86,10 @@ def date(self) -> list[datetime]:
def arg_option(self) -> str | None:
return self._arg_option

@property
def available(self) -> bool:
return self._available


class Data(_DataBase):
def __init__(
Expand All @@ -93,8 +100,10 @@ def __init__(
lag: list[Duration],
date: list[datetime],
arg_option: str | None,
*,
available: bool,
):
super().__init__(name, type, src, lag, date, arg_option)
super().__init__(name, type, src, lag, date, arg_option, available=available)
self._task: Task | None = None

def unroll(self, unrolled_task: UnrolledTask) -> Generator[UnrolledData, None, None]:
Expand Down Expand Up @@ -139,7 +148,17 @@ class UnrolledData(_DataBase):

@classmethod
def from_data(cls, data: Data, unrolled_task: UnrolledTask, unrolled_date: datetime):
return cls(unrolled_task, unrolled_date, data.name, data.type, data.src, data.lag, data.date, data.arg_option)
return cls(
unrolled_task,
unrolled_date,
data.name,
data.type,
data.src,
data.lag,
data.date,
data.arg_option,
available=data.available,
)

def __init__(
self,
Expand All @@ -151,8 +170,10 @@ def __init__(
lag: list[Duration],
date: list[datetime],
arg_option: str | None,
*,
available: bool,
):
super().__init__(name, type, src, lag, date, arg_option)
super().__init__(name, type, src, lag, date, arg_option, available=available)
self._unrolled_task = unrolled_task
self._unrolled_date = unrolled_date

Expand Down Expand Up @@ -659,14 +680,6 @@ def name(self) -> str:
def cycles(self) -> list[Cycle]:
return self._cycles

def is_available_on_init(self, data: UnrolledData) -> bool:
"""Determines if the data is available on init of the workflow."""

def equal_check(output: UnrolledData) -> bool:
return output.name == data.name and output.unrolled_date == data.unrolled_date

return len(list(filter(equal_check, self._unrolled_outputs))) == 0

@property
def unrolled_cycles(self) -> list[UnrolledCycle]:
return list(self._unrolled_cycles_map.values())
Expand Down
31 changes: 26 additions & 5 deletions src/wcflow/parsing/_yaml_data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def convert_to_struct_time(cls, value: str | None) -> time.struct_time | None:
return None if value is None else time.strptime(value, "%H:%M:%S")


class ConfigData(_NamedBaseModel):
class _DataBaseModel(_NamedBaseModel):
"""
To create an instance of a data defined in a workflow file.
"""
Expand All @@ -132,6 +132,23 @@ def is_file_or_dir(cls, value: str) -> str:
raise ValueError(msg)
return value

@property
def available(self) -> bool:
return isinstance(self, ConfigAvailableData)


class ConfigAvailableData(_DataBaseModel):
pass


class ConfigGeneratedData(_DataBaseModel):
pass


class ConfigData(BaseModel):
available: list[ConfigAvailableData]
generated: list[ConfigGeneratedData]


class ConfigCycleTaskDepend(_NamedBaseModel, _LagDateBaseModel):
"""
Expand Down Expand Up @@ -257,7 +274,7 @@ class ConfigWorkflow(BaseModel):
end_date: datetime
cycles: list[ConfigCycle]
tasks: list[ConfigTask]
data: list[ConfigData]
data: ConfigData
data_dict: dict = {}
task_dict: dict = {}

Expand All @@ -274,7 +291,9 @@ def check_start_date_before_end_date(self) -> ConfigWorkflow:
return self

def to_core_workflow(self):
self.data_dict = {data.name: data for data in self.data}
self.data_dict = {data.name: data for data in self.data.available} | {
data.name: data for data in self.data.generated
}
self.task_dict = {task.name: task for task in self.tasks}

core_cycles = [self._to_core_cycle(cycle) for cycle in self.cycles]
Expand All @@ -295,14 +314,16 @@ def _to_core_task(self, cycle_task: ConfigCycleTask) -> core.Task:
if (data := self.data_dict.get(input_.name)) is None:
msg = f"Task {cycle_task.name!r} has input {input_.name!r} that is not specied in the data section."
raise ValueError(msg)
core_data = core.Data(input_.name, data.type, data.src, input_.lag, input_.date, input_.arg_option)
core_data = core.Data(
input_.name, data.type, data.src, input_.lag, input_.date, input_.arg_option, available=data.available
)
inputs.append(core_data)

for output in cycle_task.outputs:
if (data := self.data_dict.get(output.name)) is None:
msg = f"Task {cycle_task.name!r} has output {output.name!r} that is not specied in the data section."
raise ValueError(msg)
core_data = core.Data(output.name, data.type, data.src, [], [], None)
core_data = core.Data(output.name, data.type, data.src, [], [], None, available=False)
outputs.append(core_data)

for depend in cycle_task.depends:
Expand Down
79 changes: 39 additions & 40 deletions tests/files/configs/test_config_large.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,43 +133,42 @@ tasks:
nodes: 1
walltime: 00:01:00
data:
- preproc_output:
type: file
src: $PWD/examples/files/data/file.ncdf
- grid_file:
type: file
src: $PWD/examples/files/data/grid
- obs_data:
type: file
src: $PWD/examples/files/data/obs_data
- ERA5:
type: file
src: $PWD/examples/files/data/era5
- extpar_file:
type: file
src: output
- icon_input:
type: file
src: output
- icon_restart:
type: file
format: ncdf
src: restart
- stream_1:
type: file
src: output_1
- stream_2:
type: file
src: output_2
- postout_1:
type: file
src: postout
- postout_2:
type: file
src: postout
- stored_data_1:
type: file
src: stored_data
- stored_data_2:
type: file
src: stored_data
available:
- grid_file:
type: file
src: $PWD/examples/files/data/grid
- obs_data:
type: file
src: $PWD/examples/files/data/obs_data
- ERA5:
type: file
src: $PWD/examples/files/data/era5
generated:
- extpar_file:
type: file
src: output
- icon_input:
type: file
src: output
- icon_restart:
type: file
format: ncdf
src: restart
- stream_1:
type: file
src: output_1
- stream_2:
type: file
src: output_2
- postout_1:
type: file
src: postout
- postout_2:
type: file
src: postout
- stored_data_1:
type: file
src: stored_data
- stored_data_2:
type: file
src: stored_data
23 changes: 11 additions & 12 deletions tests/files/configs/test_config_small.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ tasks:
- icon:
plugin: shell
command: $PWD/tests/files/scripts/icon.py
- postproc:
plugin: shell
command: $PWD/tests/files/scripts/postproc.py
- cleanup:
plugin: shell
command: $PWD/tests/files/scripts/cleanup.py
data:
- icon_input:
type: file
src: $PWD/tests/files/data/input
- icon_output:
type: file
src: output
- icon_restart:
type: file
src: restart
available:
- icon_input:
type: file
src: $PWD/tests/files/data/input
generated:
- icon_output:
type: file
src: output
- icon_restart:
type: file
src: restart

0 comments on commit 5f942f8

Please sign in to comment.