Skip to content

Commit

Permalink
Delta review fixes, stripNull fix & cascade
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed May 27, 2020
1 parent 7c6a4ff commit a3218bd
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 238 deletions.
50 changes: 31 additions & 19 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from time import time
import zlib

from cylc.flow import __version__ as CYLC_VERSION, ID_DELIM
from cylc.flow import __version__ as CYLC_VERSION, LOG, ID_DELIM
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask,
Expand Down Expand Up @@ -156,14 +156,16 @@ def apply_delta(key, delta, data):
# Merge in updated fields
if getattr(delta, 'updated', False):
if key == WORKFLOW:
# Clear fields that require overwrite with delta
field_set = {f.name for f, _ in delta.updated.ListFields()}
for field in CLEAR_FIELD_MAP[key]:
data[key].ClearField(field)
# Clear fields the require overwrite with delta
if field in field_set:
data[key].ClearField(field)
data[key].MergeFrom(delta.updated)
else:
for element in delta.updated:
try:
# Clear fields the require overwrite with delta
# Clear fields that require overwrite with delta
if CLEAR_FIELD_MAP[key]:
for field, _ in element.ListFields():
if field.name in CLEAR_FIELD_MAP[key]:
Expand All @@ -172,6 +174,10 @@ def apply_delta(key, delta, data):
except KeyError:
# Ensure data-sync doesn't fail with
# network issues, sync reconcile/validate will catch.
LOG.debug(
'Missing Data-Store element '
'on update application: %s' % str(exc)
)
continue
# Prune data elements
if hasattr(delta, 'pruned'):
Expand Down Expand Up @@ -201,7 +207,7 @@ def create_delta_store(delta=None, workflow_id=None):
dict
"""
if delta is None:
if not isinstance(delta, AllDeltas):
delta = AllDeltas()
delta_store = {
DELTA_ADDED: deepcopy(DATA_TEMPLATE),
Expand All @@ -220,18 +226,17 @@ def create_delta_store(delta=None, workflow_id=None):
for field, value in delta.ListFields():
for sub_field, sub_value in value.ListFields():
if sub_field.name in delta_store:
if sub_field.name in delta_store:
if (
field.name == WORKFLOW
or sub_field.name == DELTA_PRUNED
):
field_data = sub_value
else:
field_data = {
s.id: s
for s in sub_value
}
delta_store[sub_field.name][field.name] = field_data
if (
field.name == WORKFLOW
or sub_field.name == DELTA_PRUNED
):
field_data = sub_value
else:
field_data = {
s.id: s
for s in sub_value
}
delta_store[sub_field.name][field.name] = field_data
return delta_store


Expand Down Expand Up @@ -290,6 +295,7 @@ class DataStoreMgr:
'data',
'deltas',
'delta_queues',
'delta_cycle_states',
'descendants',
'edge_points',
'max_point',
Expand All @@ -313,6 +319,7 @@ def __init__(self, schd):
self.min_point = None
self.edge_points = {}
self.cycle_states = {}
self.delta_cycle_states = {}
# Managed data types
self.data = {
self.workflow_id: deepcopy(DATA_TEMPLATE)
Expand Down Expand Up @@ -384,6 +391,7 @@ def generate_definition_elements(self):
workflow = self.added[WORKFLOW]
workflow.id = self.workflow_id
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'

graph = workflow.edges
graph.leaves[:] = config.leaves
Expand Down Expand Up @@ -847,6 +855,7 @@ def update_task_proxies(self, updated_tasks=None):
task_proxies = self.data[self.workflow_id][TASK_PROXIES]
update_time = time()
task_defs = {}
self.delta_cycle_states = {}

# update task instance
for itask in updated_tasks:
Expand All @@ -858,6 +867,8 @@ def update_task_proxies(self, updated_tasks=None):
continue
self.cycle_states.setdefault(point_string, {})[name] = (
itask.state.status, itask.state.is_held)
self.delta_cycle_states.setdefault(point_string, {})[name] = (
itask.state.status, itask.state.is_held)
# Gather task definitions for elapsed time recalculation.
if name not in task_defs:
task_defs[name] = itask.tdef
Expand Down Expand Up @@ -916,14 +927,14 @@ def update_family_proxies(self, cycle_points=None):
"""
family_proxies = self.data[self.workflow_id][FAMILY_PROXIES]
if cycle_points is None:
cycle_points = self.cycle_states.keys()
cycle_points = self.delta_cycle_states.keys()
if not cycle_points:
return

for point_string in cycle_points:
# For each cycle point, construct a family state tree
# based on the first-parent single-inheritance tree
c_task_states = self.cycle_states.get(point_string, None)
c_task_states = self.delta_cycle_states.get(point_string, None)
if c_task_states is None:
continue
c_fam_task_states = {}
Expand Down Expand Up @@ -969,6 +980,7 @@ def update_workflow(self):
workflow = self.updated[WORKFLOW]
workflow.id = self.workflow_id
workflow.last_updated = time()
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'

data = self.data[self.workflow_id]

Expand Down
Loading

0 comments on commit a3218bd

Please sign in to comment.