Skip to content

Commit

Permalink
data store: don't update outputs incrementally
Browse files Browse the repository at this point in the history
* We don't have a protocol for updating arbitrary lists incrementally.
* See cylc#6307
* This is a temporary patch to disable incremental output updates until
  we have a suitable protocol for doing so.
  • Loading branch information
oliver-sanders committed Oct 29, 2024
1 parent cbfddb4 commit b62885b
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 19 deletions.
38 changes: 21 additions & 17 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2420,23 +2420,27 @@ def delta_task_output(
objects from the workflow task pool.
"""
tproxy: Optional[PbTaskProxy]
tp_id, tproxy = self.store_node_fetcher(itask.tokens)
if not tproxy:
return
outputs = itask.state.outputs
label = outputs.get_trigger(message)
# update task instance
update_time = time()
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
output = tp_delta.outputs[label]
output.label = label
output.message = message
output.satisfied = outputs.is_message_complete(message)
output.time = update_time
self.updates_pending = True
# TODO: Restore incremental update when we have a protocol to do so
# https://github.com/cylc/cylc-flow/issues/6307
return self.delta_task_outputs(itask)

# tproxy: Optional[PbTaskProxy]
# tp_id, tproxy = self.store_node_fetcher(itask.tokens)
# if not tproxy:
# return
# outputs = itask.state.outputs
# label = outputs.get_trigger(message)
# # update task instance
# update_time = time()
# tp_delta = self.updated[TASK_PROXIES].setdefault(
# tp_id, PbTaskProxy(id=tp_id))
# tp_delta.stamp = f'{tp_id}@{update_time}'
# output = tp_delta.outputs[label]
# output.label = label
# output.message = message
# output.satisfied = outputs.is_message_complete(message)
# output.time = update_time
# self.updates_pending = True

def delta_task_outputs(self, itask: TaskProxy) -> None:
"""Create delta for change in all task proxy outputs.
Expand Down
79 changes: 77 additions & 2 deletions tests/integration/test_data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
)
from cylc.flow.id import Tokens
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_outputs import (
TASK_OUTPUT_SUBMITTED,
TASK_OUTPUT_STARTED,
TASK_OUTPUT_SUCCEEDED,
)
from cylc.flow.task_state import (
TASK_STATUS_FAILED,
TASK_STATUS_SUCCEEDED,
Expand All @@ -37,8 +43,13 @@
from cylc.flow.wallclock import get_current_time_string


# NOTE: These tests mutate the data store, so running them in isolation may
# see failures when they actually pass if you run the whole file
if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


# NOTE: Some of these tests mutate the data store, so running them in
# isolation may see failures when they actually pass if you run the
# whole file


def job_config(schd):
Expand Down Expand Up @@ -392,3 +403,67 @@ async def test_flow_numbers(flow, scheduler, start):
ds_task = schd.data_store_mgr.get_data_elements(TASK_PROXIES).added[1]
assert ds_task.name == 'b'
assert ds_task.flow_nums == '[2]'


async def test_delta_task_outputs(one: 'Scheduler', start):
"""Ensure task outputs are inserted into the store.
Note: Task outputs should *not* be updated incrementally until we have
a protocol for doing so, see https://github.com/cylc/cylc-flow/pull/6403
"""

def get_data_outputs():
"""Return satisfied outputs from the *data* store."""
nonlocal one, itask
return {output.label for output in one.data_store_mgr.data[one.id][TASK_PROXIES][itask.tokens.id].outputs.values() if output.satisfied}

def get_delta_outputs():
"""Return satisfied outputs from the *delta* store.
Or return None if there's nothing there.
"""
nonlocal one, itask
try:
return {output.label for output in one.data_store_mgr.updated[TASK_PROXIES][itask.tokens.id].outputs.values() if output.satisfied}
except KeyError:
return None

def _patch_remove(*args, **kwargs):
"""Prevent the task/workflow from completing."""
pass

async with start(one):
one.pool.remove = _patch_remove

# create a job submission
itask = one.pool.get_tasks()[0]
assert itask
itask.submit_num += 1
one.data_store_mgr.insert_job(itask.tdef.name, itask.point, itask.state.status, {'submit_num': 1})
await one.update_data_structure()

# satisfy the submitted & started outputs
# (note started implies submitted)
one.task_events_mgr.process_message(itask, 'INFO', TaskEventsManager.EVENT_STARTED)

# the delta should be populated with the newly satisfied outputs
assert get_data_outputs() == set()
assert get_delta_outputs() == {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}
assert get_delta_outputs() is None

# satisfy the succeeded output
one.task_events_mgr.process_message(itask, 'INFO', TaskEventsManager.EVENT_SUCCEEDED)

# the delta should be populated with ALL satisfied outputs
# (not just the newly satisfied output)
assert get_data_outputs() == {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED}
assert get_delta_outputs() == {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED}

# the delta should be applied to the store
await one.update_data_structure()
assert get_data_outputs() == {TASK_OUTPUT_SUBMITTED, TASK_OUTPUT_STARTED, TASK_OUTPUT_SUCCEEDED}
assert get_delta_outputs() is None

0 comments on commit b62885b

Please sign in to comment.