diff --git a/src/wcflow/core.py b/src/wcflow/core.py index 0458a99..2e27e2e 100644 --- a/src/wcflow/core.py +++ b/src/wcflow/core.py @@ -24,6 +24,8 @@ def __init__( lag: list[Duration], date: list[datetime], arg_option: str | None, + *, + available: bool, ): self._name = name @@ -52,6 +54,7 @@ def __init__( self._lag = lag self._date = date self._arg_option = arg_option + self._available = available @property def name(self) -> str: @@ -83,6 +86,10 @@ def date(self) -> list[datetime]: def arg_option(self) -> str | None: return self._arg_option + @property + def available(self) -> bool: + return self._available + class Data(_DataBase): def __init__( @@ -93,8 +100,10 @@ def __init__( lag: list[Duration], date: list[datetime], arg_option: str | None, + *, + available: bool, ): - super().__init__(name, type, src, lag, date, arg_option) + super().__init__(name, type, src, lag, date, arg_option, available=available) self._task: Task | None = None def unroll(self, unrolled_task: UnrolledTask) -> Generator[UnrolledData, None, None]: @@ -139,7 +148,17 @@ class UnrolledData(_DataBase): @classmethod def from_data(cls, data: Data, unrolled_task: UnrolledTask, unrolled_date: datetime): - return cls(unrolled_task, unrolled_date, data.name, data.type, data.src, data.lag, data.date, data.arg_option) + return cls( + unrolled_task, + unrolled_date, + data.name, + data.type, + data.src, + data.lag, + data.date, + data.arg_option, + available=data.available, + ) def __init__( self, @@ -151,8 +170,10 @@ def __init__( lag: list[Duration], date: list[datetime], arg_option: str | None, + *, + available: bool, ): - super().__init__(name, type, src, lag, date, arg_option) + super().__init__(name, type, src, lag, date, arg_option, available=available) self._unrolled_task = unrolled_task self._unrolled_date = unrolled_date @@ -659,14 +680,6 @@ def name(self) -> str: def cycles(self) -> list[Cycle]: return self._cycles - def is_available_on_init(self, data: UnrolledData) -> bool: - """Determines if the data is available on init of the workflow.""" - - def equal_check(output: UnrolledData) -> bool: - return output.name == data.name and output.unrolled_date == data.unrolled_date - - return len(list(filter(equal_check, self._unrolled_outputs))) == 0 - @property def unrolled_cycles(self) -> list[UnrolledCycle]: return list(self._unrolled_cycles_map.values()) diff --git a/src/wcflow/parsing/_yaml_data_models.py b/src/wcflow/parsing/_yaml_data_models.py index 3a2cf9f..551cf31 100644 --- a/src/wcflow/parsing/_yaml_data_models.py +++ b/src/wcflow/parsing/_yaml_data_models.py @@ -114,7 +114,7 @@ def convert_to_struct_time(cls, value: str | None) -> time.struct_time | None: return None if value is None else time.strptime(value, "%H:%M:%S") -class ConfigData(_NamedBaseModel): +class _DataBaseModel(_NamedBaseModel): """ To create an instance of a data defined in a workflow file. """ @@ -132,6 +132,23 @@ def is_file_or_dir(cls, value: str) -> str: raise ValueError(msg) return value + @property + def available(self) -> bool: + return isinstance(self, ConfigAvailableData) + + +class ConfigAvailableData(_DataBaseModel): + pass + + +class ConfigGeneratedData(_DataBaseModel): + pass + + +class ConfigData(BaseModel): + available: list[ConfigAvailableData] + generated: list[ConfigGeneratedData] + class ConfigCycleTaskDepend(_NamedBaseModel, _LagDateBaseModel): """ @@ -257,7 +274,7 @@ class ConfigWorkflow(BaseModel): end_date: datetime cycles: list[ConfigCycle] tasks: list[ConfigTask] - data: list[ConfigData] + data: ConfigData data_dict: dict = {} task_dict: dict = {} @@ -274,7 +291,9 @@ def check_start_date_before_end_date(self) -> ConfigWorkflow: return self def to_core_workflow(self): - self.data_dict = {data.name: data for data in self.data} + self.data_dict = {data.name: data for data in self.data.available} | { + data.name: data for data in self.data.generated + } self.task_dict = {task.name: task for task in self.tasks} core_cycles = [self._to_core_cycle(cycle) for cycle in self.cycles] @@ -295,14 +314,16 @@ def _to_core_task(self, cycle_task: ConfigCycleTask) -> core.Task: if (data := self.data_dict.get(input_.name)) is None: msg = f"Task {cycle_task.name!r} has input {input_.name!r} that is not specied in the data section." raise ValueError(msg) - core_data = core.Data(input_.name, data.type, data.src, input_.lag, input_.date, input_.arg_option) + core_data = core.Data( + input_.name, data.type, data.src, input_.lag, input_.date, input_.arg_option, available=data.available + ) inputs.append(core_data) for output in cycle_task.outputs: if (data := self.data_dict.get(output.name)) is None: msg = f"Task {cycle_task.name!r} has output {output.name!r} that is not specied in the data section." raise ValueError(msg) - core_data = core.Data(output.name, data.type, data.src, [], [], None) + core_data = core.Data(output.name, data.type, data.src, [], [], None, available=False) outputs.append(core_data) for depend in cycle_task.depends: diff --git a/tests/files/configs/test_config_large.yml b/tests/files/configs/test_config_large.yml index c05a70a..6322339 100644 --- a/tests/files/configs/test_config_large.yml +++ b/tests/files/configs/test_config_large.yml @@ -133,43 +133,42 @@ tasks: nodes: 1 walltime: 00:01:00 data: - - preproc_output: - type: file - src: $PWD/examples/files/data/file.ncdf - - grid_file: - type: file - src: $PWD/examples/files/data/grid - - obs_data: - type: file - src: $PWD/examples/files/data/obs_data - - ERA5: - type: file - src: $PWD/examples/files/data/era5 - - extpar_file: - type: file - src: output - - icon_input: - type: file - src: output - - icon_restart: - type: file - format: ncdf - src: restart - - stream_1: - type: file - src: output_1 - - stream_2: - type: file - src: output_2 - - postout_1: - type: file - src: postout - - postout_2: - type: file - src: postout - - stored_data_1: - type: file - src: stored_data - - stored_data_2: - type: file - src: stored_data + available: + - grid_file: + type: file + src: $PWD/examples/files/data/grid + - obs_data: + type: file + src: $PWD/examples/files/data/obs_data + - ERA5: + type: file + src: $PWD/examples/files/data/era5 + generated: + - extpar_file: + type: file + src: output + - icon_input: + type: file + src: output + - icon_restart: + type: file + format: ncdf + src: restart + - stream_1: + type: file + src: output_1 + - stream_2: + type: file + src: output_2 + - postout_1: + type: file + src: postout + - postout_2: + type: file + src: postout + - stored_data_1: + type: file + src: stored_data + - stored_data_2: + type: file + src: stored_data diff --git a/tests/files/configs/test_config_small.yml b/tests/files/configs/test_config_small.yml index cb7e2ef..fe06be9 100644 --- a/tests/files/configs/test_config_small.yml +++ b/tests/files/configs/test_config_small.yml @@ -23,19 +23,18 @@ tasks: - icon: plugin: shell command: $PWD/tests/files/scripts/icon.py - - postproc: - plugin: shell - command: $PWD/tests/files/scripts/postproc.py - cleanup: plugin: shell command: $PWD/tests/files/scripts/cleanup.py data: - - icon_input: - type: file - src: $PWD/tests/files/data/input - - icon_output: - type: file - src: output - - icon_restart: - type: file - src: restart + available: + - icon_input: + type: file + src: $PWD/tests/files/data/input + generated: + - icon_output: + type: file + src: output + - icon_restart: + type: file + src: restart