Skip to content

Commit

Permalink
data store: don't update outputs incrementally
Browse files Browse the repository at this point in the history
* We don't have a protocol for updating arbitrary lists incrementally.
* See #6307
* This is a temporary patch to disable incremental output updates until
  we have a suitable protocol for doing so.
  • Loading branch information
oliver-sanders authored and dwsutherland committed Oct 30, 2024
1 parent cbfddb4 commit 5bfb487
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 20 deletions.
38 changes: 21 additions & 17 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2420,23 +2420,27 @@ def delta_task_output(
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
outputs = itask.state.outputs
label = outputs.get_trigger(message)
# update task instance
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
output = tp_delta.outputs[label]
output.label = label
output.message = message
output.satisfied = outputs.is_message_complete(message)
output.time = update_time
self.updates_pending = True
# TODO: Restore incremental update when we have a protocol to do so
# https://github.com/cylc/cylc-flow/issues/6307
return self.delta_task_outputs(itask)

# tproxy: Optional[PbTaskProxy]
# tp_id, tproxy = self.store_node_fetcher(itask.tokens)
# if not tproxy:
# return
# outputs = itask.state.outputs
# label = outputs.get_trigger(message)
# # update task instance
# update_time = time()
# tp_delta = self.updated[TASK_PROXIES].setdefault(
# tp_id, PbTaskProxy(id=tp_id))
# tp_delta.stamp = f'{tp_id}@{update_time}'
# output = tp_delta.outputs[label]
# output.label = label
# output.message = message
# output.satisfied = outputs.is_message_complete(message)
# output.time = update_time
# self.updates_pending = True

def delta_task_outputs(self, itask: TaskProxy) -> None:
"""Create delta for change in all task proxy outputs.
Expand Down
116 changes: 113 additions & 3 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from typing import Iterable, List, cast
from typing import Iterable, List, TYPE_CHECKING, cast

import pytest

Expand All @@ -29,6 +29,12 @@
)
from cylc.flow.id import Tokens
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
Expand All @@ -37,8 +43,13 @@
from cylc.flow.wallclock import get_current_time_string


# NOTE: These tests mutate the data store, so running them in isolation may
# see failures when they actually pass if you run the whole file
if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


# NOTE: Some of these tests mutate the data store, so running them in
# isolation may see failures when they actually pass if you run the
# whole file


def job_config(schd):
Expand Down Expand Up @@ -392,3 +403,102 @@ async def test_flow_numbers(flow, scheduler, start):
ds_task = schd.data_store_mgr.get_data_elements(TASK_PROXIES).added[1]
assert ds_task.name == 'b'
assert ds_task.flow_nums == '[2]'


async def test_delta_task_outputs(one: 'Scheduler', start):
"""Ensure task outputs are inserted into the store.
Note: Task outputs should *not* be updated incrementally until we have
a protocol for doing so, see https://github.com/cylc/cylc-flow/pull/6403
"""

def get_data_outputs():
"""Return satisfied outputs from the *data* store."""
nonlocal one, itask
return {
output.label
for output in one.data_store_mgr.data[one.id][TASK_PROXIES][
itask.tokens.id
].outputs.values()
if output.satisfied
}

def get_delta_outputs():
"""Return satisfied outputs from the *delta* store.
Or return None if there's nothing there.
"""
nonlocal one, itask
try:
return {
output.label
for output in one.data_store_mgr.updated[TASK_PROXIES][
itask.tokens.id
].outputs.values()
if output.satisfied
}
except KeyError:
return None

def _patch_remove(*args, **kwargs):
"""Prevent the task/workflow from completing."""
pass

async with start(one):
one.pool.remove = _patch_remove

# create a job submission
itask = one.pool.get_tasks()[0]
assert itask
itask.submit_num += 1
one.data_store_mgr.insert_job(
itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1}
)
await one.update_data_structure()

# satisfy the submitted & started outputs
# (note started implies submitted)
one.task_events_mgr.process_message(
itask, 'INFO', TaskEventsManager.EVENT_STARTED
)

# the delta should be populated with the newly satisfied outputs
assert get_data_outputs() == set()
assert get_delta_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
assert get_delta_outputs() is None

# satisfy the succeeded output
one.task_events_mgr.process_message(
itask, 'INFO', TaskEventsManager.EVENT_SUCCEEDED
)

# the delta should be populated with ALL satisfied outputs
# (not just the newly satisfied output)
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
}
assert get_delta_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
}
assert get_delta_outputs() is None

0 comments on commit 5bfb487

Please sign in to comment.