diff --git a/cylc/flow/data_messages.proto b/cylc/flow/data_messages.proto index 2b51e5da44f..7a149fd26a3 100644 --- a/cylc/flow/data_messages.proto +++ b/cylc/flow/data_messages.proto @@ -238,57 +238,70 @@ message PbEntireWorkflow { message EDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbEdge deltas = 4; - bool reloaded = 5; + repeated PbEdge added = 3; + repeated PbEdge updated = 4; + repeated string pruned = 5; + bool reloaded = 6; } message FDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbFamily deltas = 4; - bool reloaded = 5; + repeated PbFamily added = 3; + repeated PbFamily updated = 4; + repeated string pruned = 5; + bool reloaded = 6; } message FPDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbFamilyProxy deltas = 4; - bool reloaded = 5; + repeated PbFamilyProxy added = 3; + repeated PbFamilyProxy updated = 4; + repeated string pruned = 5; + bool reloaded = 6; } message JDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbJob deltas = 4; - bool reloaded = 5; + repeated PbJob added = 3; + repeated PbJob updated = 4; + repeated string pruned = 5; + bool reloaded = 6; } message TDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbTask deltas = 4; - bool reloaded = 5; + repeated PbTask added = 3; + repeated PbTask updated = 4; + repeated string pruned = 5; + bool reloaded = 6; } message TPDeltas { double time = 1; int64 checksum = 2; - repeated string pruned = 3; - repeated PbTaskProxy deltas = 4; - bool reloaded = 5; + repeated PbTaskProxy added = 3; + repeated PbTaskProxy updated = 4; + repeated string pruned = 5; + bool reloaded = 6; +} + +message WDeltas { + double time = 1; + PbWorkflow added = 2; + PbWorkflow updated = 3; + bool reloaded = 4; } message AllDeltas { - PbWorkflow workflow = 1; - EDeltas edges = 2; - FDeltas families = 3; - FPDeltas family_proxies = 4; - JDeltas jobs = 5; - TDeltas tasks = 6; - TPDeltas task_proxies = 7; + FDeltas families = 1; + FPDeltas family_proxies = 2; + JDeltas jobs = 3; + TDeltas tasks = 4; + TPDeltas task_proxies = 5; + EDeltas edges = 6; + WDeltas workflow = 7; } diff --git a/cylc/flow/data_messages_pb2.py b/cylc/flow/data_messages_pb2.py index f80ccecca48..226490cfd3c 100644 --- a/cylc/flow/data_messages_pb2.py +++ b/cylc/flow/data_messages_pb2.py @@ -18,7 +18,7 @@ package='', syntax='proto3', serialized_options=None, - serialized_pb=b'\n\x13\x64\x61ta_messages.proto\"O\n\x06PbMeta\x12\r\n\x05title\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0b\n\x03URL\x18\x03 \x01(\t\x12\x14\n\x0cuser_defined\x18\x04 \x03(\t\"[\n\nPbTimeZone\x12\r\n\x05hours\x18\x01 \x01(\x05\x12\x0f\n\x07minutes\x18\x02 \x01(\x05\x12\x14\n\x0cstring_basic\x18\x03 \x01(\t\x12\x17\n\x0fstring_extended\x18\x04 \x01(\t\"\xeb\x05\n\nPbWorkflow\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x0e\n\x06status\x18\x04 \x01(\t\x12\x0c\n\x04host\x18\x05 \x01(\t\x12\x0c\n\x04port\x18\x06 \x01(\x05\x12\r\n\x05owner\x18\x07 \x01(\t\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x17\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdges\x12\x13\n\x0b\x61pi_version\x18\x0b \x01(\x05\x12\x14\n\x0c\x63ylc_version\x18\x0c \x01(\t\x12\x14\n\x0clast_updated\x18\r \x01(\x01\x12\x15\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMeta\x12#\n\x1bnewest_runahead_cycle_point\x18\x0f \x01(\t\x12\x1a\n\x12newest_cycle_point\x18\x10 \x01(\t\x12\x1a\n\x12oldest_cycle_point\x18\x11 \x01(\t\x12\x10\n\x08reloaded\x18\x12 \x01(\x08\x12\x10\n\x08run_mode\x18\x13 \x01(\t\x12\x14\n\x0c\x63ycling_mode\x18\x14 \x01(\t\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x18\n\x10workflow_log_dir\x18\x16 \x01(\t\x12#\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZone\x12\x12\n\ntree_depth\x18\x18 \x01(\x05\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x15\n\rns_defn_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x12\n\nstatus_msg\x18\x1e \x01(\t\x12\x15\n\ris_held_total\x18\x1f \x01(\x05\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\x85\x05\n\x05PbJob\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x12\n\nsubmit_num\x18\x03 \x01(\x05\x12\r\n\x05state\x18\x04 \x01(\t\x12\x12\n\ntask_proxy\x18\x05 \x01(\t\x12\x16\n\x0esubmitted_time\x18\x06 \x01(\t\x12\x14\n\x0cstarted_time\x18\x07 \x01(\t\x12\x15\n\rfinished_time\x18\x08 \x01(\t\x12\x18\n\x10\x62\x61tch_sys_job_id\x18\t \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_name\x18\n \x01(\t\x12\x12\n\nenv_script\x18\x0b \x01(\t\x12\x12\n\nerr_script\x18\x0c \x01(\t\x12\x13\n\x0b\x65xit_script\x18\r \x01(\t\x12\x1c\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02\x12\x0c\n\x04host\x18\x0f \x01(\t\x12\x13\n\x0binit_script\x18\x10 \x01(\t\x12\x13\n\x0bjob_log_dir\x18\x11 \x01(\t\x12\r\n\x05owner\x18\x12 \x01(\t\x12\x13\n\x0bpost_script\x18\x13 \x01(\t\x12\x12\n\npre_script\x18\x14 \x01(\t\x12\x0e\n\x06script\x18\x15 \x01(\t\x12\r\n\x05shell\x18\x16 \x01(\t\x12\x14\n\x0cwork_sub_dir\x18\x17 \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_conf\x18\x18 \x03(\t\x12\x13\n\x0b\x65nvironment\x18\x19 \x03(\t\x12\x12\n\ndirectives\x18\x1a \x03(\t\x12\x16\n\x0eparam_env_tmpl\x18\x1b \x03(\t\x12\x11\n\tparam_var\x18\x1c \x03(\t\x12\x12\n\nextra_logs\x18\x1d \x03(\t\x12\x0c\n\x04name\x18\x1e \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x1f \x01(\t\x12\x10\n\x08messages\x18 \x03(\t\"\x96\x01\n\x06PbTask\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\x19\n\x11mean_elapsed_time\x18\x05 \x01(\x02\x12\r\n\x05\x64\x65pth\x18\x06 \x01(\x05\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\"r\n\nPbPollTask\x12\x13\n\x0blocal_proxy\x18\x01 \x01(\t\x12\x10\n\x08workflow\x18\x02 \x01(\t\x12\x14\n\x0cremote_proxy\x18\x03 \x01(\t\x12\x11\n\treq_state\x18\x04 \x01(\t\x12\x14\n\x0cgraph_string\x18\x05 \x01(\t\"l\n\x0bPbCondition\x12\x12\n\ntask_proxy\x18\x01 \x01(\t\x12\x12\n\nexpr_alias\x18\x02 \x01(\t\x12\x11\n\treq_state\x18\x03 \x01(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\x12\x0f\n\x07message\x18\x05 \x01(\t\"o\n\x0ePbPrerequisite\x12\x12\n\nexpression\x18\x01 \x01(\t\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\"\xfd\x02\n\x0bPbTaskProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04task\x18\x03 \x01(\t\x12\r\n\x05state\x18\x04 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x05 \x01(\t\x12\x0f\n\x07spawned\x18\x06 \x01(\x08\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x13\n\x0bjob_submits\x18\x08 \x01(\x05\x12\x16\n\x0elatest_message\x18\t \x01(\t\x12\x0f\n\x07outputs\x18\n \x03(\t\x12\x12\n\nbroadcasts\x18\x0b \x03(\t\x12\x11\n\tnamespace\x18\x0c \x03(\t\x12&\n\rprerequisites\x18\r \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\x0e \x03(\t\x12\x0f\n\x07parents\x18\x0f \x03(\t\x12\x14\n\x0c\x66irst_parent\x18\x10 \x01(\t\x12\x0c\n\x04name\x18\x11 \x01(\t\x12\x0f\n\x07is_held\x18\x12 \x01(\x08\x12\r\n\x05\x65\x64ges\x18\x13 \x03(\t\x12\x11\n\tancestors\x18\x14 \x03(\t\"\xa8\x01\n\x08PbFamily\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\r\n\x05\x64\x65pth\x18\x05 \x01(\x05\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\"\xf3\x01\n\rPbFamilyProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x03 \x01(\t\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0e\n\x06\x66\x61mily\x18\x05 \x01(\t\x12\r\n\x05state\x18\x06 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x14\n\x0c\x66irst_parent\x18\x08 \x01(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x0f\n\x07is_held\x18\x0c \x01(\x08\x12\x11\n\tancestors\x18\r \x03(\t\"b\n\x06PbEdge\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0e\n\x06source\x18\x03 \x01(\t\x12\x0e\n\x06target\x18\x04 \x01(\t\x12\x0f\n\x07suicide\x18\x05 \x01(\x08\x12\x0c\n\x04\x63ond\x18\x06 \x01(\x08\"o\n\x07PbEdges\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\t\"\xe0\x01\n\x10PbEntireWorkflow\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdge\"d\n\x07\x45\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"f\n\x07\x46\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x19\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"l\n\x08\x46PDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1e\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"c\n\x07JDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x16\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"d\n\x07TDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"j\n\x08TPDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1c\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"\xd4\x01\n\tAllDeltas\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x17\n\x05\x65\x64ges\x18\x02 \x01(\x0b\x32\x08.EDeltas\x12\x1a\n\x08\x66\x61milies\x18\x03 \x01(\x0b\x32\x08.FDeltas\x12!\n\x0e\x66\x61mily_proxies\x18\x04 \x01(\x0b\x32\t.FPDeltas\x12\x16\n\x04jobs\x18\x05 \x01(\x0b\x32\x08.JDeltas\x12\x17\n\x05tasks\x18\x06 \x01(\x0b\x32\x08.TDeltas\x12\x1f\n\x0ctask_proxies\x18\x07 \x01(\x0b\x32\t.TPDeltasb\x06proto3' + serialized_pb=b'\n\x13\x64\x61ta_messages.proto\"O\n\x06PbMeta\x12\r\n\x05title\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0b\n\x03URL\x18\x03 \x01(\t\x12\x14\n\x0cuser_defined\x18\x04 \x03(\t\"[\n\nPbTimeZone\x12\r\n\x05hours\x18\x01 \x01(\x05\x12\x0f\n\x07minutes\x18\x02 \x01(\x05\x12\x14\n\x0cstring_basic\x18\x03 \x01(\t\x12\x17\n\x0fstring_extended\x18\x04 \x01(\t\"\xeb\x05\n\nPbWorkflow\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x0e\n\x06status\x18\x04 \x01(\t\x12\x0c\n\x04host\x18\x05 \x01(\t\x12\x0c\n\x04port\x18\x06 \x01(\x05\x12\r\n\x05owner\x18\x07 \x01(\t\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x17\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdges\x12\x13\n\x0b\x61pi_version\x18\x0b \x01(\x05\x12\x14\n\x0c\x63ylc_version\x18\x0c \x01(\t\x12\x14\n\x0clast_updated\x18\r \x01(\x01\x12\x15\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMeta\x12#\n\x1bnewest_runahead_cycle_point\x18\x0f \x01(\t\x12\x1a\n\x12newest_cycle_point\x18\x10 \x01(\t\x12\x1a\n\x12oldest_cycle_point\x18\x11 \x01(\t\x12\x10\n\x08reloaded\x18\x12 \x01(\x08\x12\x10\n\x08run_mode\x18\x13 \x01(\t\x12\x14\n\x0c\x63ycling_mode\x18\x14 \x01(\t\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x18\n\x10workflow_log_dir\x18\x16 \x01(\t\x12#\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZone\x12\x12\n\ntree_depth\x18\x18 \x01(\x05\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x15\n\rns_defn_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x12\n\nstatus_msg\x18\x1e \x01(\t\x12\x15\n\ris_held_total\x18\x1f \x01(\x05\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\x85\x05\n\x05PbJob\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x12\n\nsubmit_num\x18\x03 \x01(\x05\x12\r\n\x05state\x18\x04 \x01(\t\x12\x12\n\ntask_proxy\x18\x05 \x01(\t\x12\x16\n\x0esubmitted_time\x18\x06 \x01(\t\x12\x14\n\x0cstarted_time\x18\x07 \x01(\t\x12\x15\n\rfinished_time\x18\x08 \x01(\t\x12\x18\n\x10\x62\x61tch_sys_job_id\x18\t \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_name\x18\n \x01(\t\x12\x12\n\nenv_script\x18\x0b \x01(\t\x12\x12\n\nerr_script\x18\x0c \x01(\t\x12\x13\n\x0b\x65xit_script\x18\r \x01(\t\x12\x1c\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02\x12\x0c\n\x04host\x18\x0f \x01(\t\x12\x13\n\x0binit_script\x18\x10 \x01(\t\x12\x13\n\x0bjob_log_dir\x18\x11 \x01(\t\x12\r\n\x05owner\x18\x12 \x01(\t\x12\x13\n\x0bpost_script\x18\x13 \x01(\t\x12\x12\n\npre_script\x18\x14 \x01(\t\x12\x0e\n\x06script\x18\x15 \x01(\t\x12\r\n\x05shell\x18\x16 \x01(\t\x12\x14\n\x0cwork_sub_dir\x18\x17 \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_conf\x18\x18 \x03(\t\x12\x13\n\x0b\x65nvironment\x18\x19 \x03(\t\x12\x12\n\ndirectives\x18\x1a \x03(\t\x12\x16\n\x0eparam_env_tmpl\x18\x1b \x03(\t\x12\x11\n\tparam_var\x18\x1c \x03(\t\x12\x12\n\nextra_logs\x18\x1d \x03(\t\x12\x0c\n\x04name\x18\x1e \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x1f \x01(\t\x12\x10\n\x08messages\x18 \x03(\t\"\x96\x01\n\x06PbTask\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\x19\n\x11mean_elapsed_time\x18\x05 \x01(\x02\x12\r\n\x05\x64\x65pth\x18\x06 \x01(\x05\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\"r\n\nPbPollTask\x12\x13\n\x0blocal_proxy\x18\x01 \x01(\t\x12\x10\n\x08workflow\x18\x02 \x01(\t\x12\x14\n\x0cremote_proxy\x18\x03 \x01(\t\x12\x11\n\treq_state\x18\x04 \x01(\t\x12\x14\n\x0cgraph_string\x18\x05 \x01(\t\"l\n\x0bPbCondition\x12\x12\n\ntask_proxy\x18\x01 \x01(\t\x12\x12\n\nexpr_alias\x18\x02 \x01(\t\x12\x11\n\treq_state\x18\x03 \x01(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\x12\x0f\n\x07message\x18\x05 \x01(\t\"o\n\x0ePbPrerequisite\x12\x12\n\nexpression\x18\x01 \x01(\t\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\"\xfd\x02\n\x0bPbTaskProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04task\x18\x03 \x01(\t\x12\r\n\x05state\x18\x04 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x05 \x01(\t\x12\x0f\n\x07spawned\x18\x06 \x01(\x08\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x13\n\x0bjob_submits\x18\x08 \x01(\x05\x12\x16\n\x0elatest_message\x18\t \x01(\t\x12\x0f\n\x07outputs\x18\n \x03(\t\x12\x12\n\nbroadcasts\x18\x0b \x03(\t\x12\x11\n\tnamespace\x18\x0c \x03(\t\x12&\n\rprerequisites\x18\r \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\x0e \x03(\t\x12\x0f\n\x07parents\x18\x0f \x03(\t\x12\x14\n\x0c\x66irst_parent\x18\x10 \x01(\t\x12\x0c\n\x04name\x18\x11 \x01(\t\x12\x0f\n\x07is_held\x18\x12 \x01(\x08\x12\r\n\x05\x65\x64ges\x18\x13 \x03(\t\x12\x11\n\tancestors\x18\x14 \x03(\t\"\xa8\x01\n\x08PbFamily\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\r\n\x05\x64\x65pth\x18\x05 \x01(\x05\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\"\xf3\x01\n\rPbFamilyProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x03 \x01(\t\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0e\n\x06\x66\x61mily\x18\x05 \x01(\t\x12\r\n\x05state\x18\x06 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x14\n\x0c\x66irst_parent\x18\x08 \x01(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x0f\n\x07is_held\x18\x0c \x01(\x08\x12\x11\n\tancestors\x18\r \x03(\t\"b\n\x06PbEdge\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0e\n\x06source\x18\x03 \x01(\t\x12\x0e\n\x06target\x18\x04 \x01(\t\x12\x0f\n\x07suicide\x18\x05 \x01(\x08\x12\x0c\n\x04\x63ond\x18\x06 \x01(\x08\"o\n\x07PbEdges\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\t\"\xe0\x01\n\x10PbEntireWorkflow\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdge\"}\n\x07\x45\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbEdge\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"\x81\x01\n\x07\x46\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x18\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\t.PbFamily\x12\x1a\n\x07updated\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"\x8c\x01\n\x08\x46PDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x1d\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x1f\n\x07updated\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"{\n\x07JDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x15\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x06.PbJob\x12\x17\n\x07updated\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"}\n\x07TDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x16\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x07.PbTask\x12\x18\n\x07updated\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"\x88\x01\n\x08TPDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x1b\n\x05\x61\x64\x64\x65\x64\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x1d\n\x07updated\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x0e\n\x06pruned\x18\x05 \x03(\t\x12\x10\n\x08reloaded\x18\x06 \x01(\x08\"c\n\x07WDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x1a\n\x05\x61\x64\x64\x65\x64\x18\x02 \x01(\x0b\x32\x0b.PbWorkflow\x12\x1c\n\x07updated\x18\x03 \x01(\x0b\x32\x0b.PbWorkflow\x12\x10\n\x08reloaded\x18\x04 \x01(\x08\"\xd1\x01\n\tAllDeltas\x12\x1a\n\x08\x66\x61milies\x18\x01 \x01(\x0b\x32\x08.FDeltas\x12!\n\x0e\x66\x61mily_proxies\x18\x02 \x01(\x0b\x32\t.FPDeltas\x12\x16\n\x04jobs\x18\x03 \x01(\x0b\x32\x08.JDeltas\x12\x17\n\x05tasks\x18\x04 \x01(\x0b\x32\x08.TDeltas\x12\x1f\n\x0ctask_proxies\x18\x05 \x01(\x0b\x32\t.TPDeltas\x12\x17\n\x05\x65\x64ges\x18\x06 \x01(\x0b\x32\x08.EDeltas\x12\x1a\n\x08workflow\x18\x07 \x01(\x0b\x32\x08.WDeltasb\x06proto3' ) @@ -1490,22 +1490,29 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='EDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='EDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='EDeltas.deltas', index=3, + name='updated', full_name='EDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='EDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='EDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='EDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1523,7 +1530,7 @@ oneofs=[ ], serialized_start=3328, - serialized_end=3428, + serialized_end=3453, ) @@ -1549,22 +1556,29 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='FDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='FDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='FDeltas.deltas', index=3, + name='updated', full_name='FDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='FDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='FDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='FDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1581,8 +1595,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3430, - serialized_end=3532, + serialized_start=3456, + serialized_end=3585, ) @@ -1608,22 +1622,29 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='FPDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='FPDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='FPDeltas.deltas', index=3, + name='updated', full_name='FPDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='FPDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='FPDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='FPDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1640,8 +1661,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3534, - serialized_end=3642, + serialized_start=3588, + serialized_end=3728, ) @@ -1667,22 +1688,29 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='JDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='JDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='JDeltas.deltas', index=3, + name='updated', full_name='JDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='JDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='JDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='JDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1699,8 +1727,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3644, - serialized_end=3743, + serialized_start=3730, + serialized_end=3853, ) @@ -1726,22 +1754,29 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='TDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='TDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='TDeltas.deltas', index=3, + name='updated', full_name='TDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='TDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='TDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='TDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1758,8 +1793,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3745, - serialized_end=3845, + serialized_start=3855, + serialized_end=3980, ) @@ -1785,22 +1820,81 @@ is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='pruned', full_name='TPDeltas.pruned', index=2, - number=3, type=9, cpp_type=9, label=3, + name='added', full_name='TPDeltas.added', index=2, + number=3, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='deltas', full_name='TPDeltas.deltas', index=3, + name='updated', full_name='TPDeltas.updated', index=3, number=4, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='reloaded', full_name='TPDeltas.reloaded', index=4, - number=5, type=8, cpp_type=7, label=1, + name='pruned', full_name='TPDeltas.pruned', index=4, + number=5, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='TPDeltas.reloaded', index=5, + number=6, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=3983, + serialized_end=4119, +) + + +_WDELTAS = _descriptor.Descriptor( + name='WDeltas', + full_name='WDeltas', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='time', full_name='WDeltas.time', index=0, + number=1, type=1, cpp_type=5, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='added', full_name='WDeltas.added', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='updated', full_name='WDeltas.updated', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reloaded', full_name='WDeltas.reloaded', index=3, + number=4, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, @@ -1817,8 +1911,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3847, - serialized_end=3953, + serialized_start=4121, + serialized_end=4220, ) @@ -1830,49 +1924,49 @@ containing_type=None, fields=[ _descriptor.FieldDescriptor( - name='workflow', full_name='AllDeltas.workflow', index=0, + name='families', full_name='AllDeltas.families', index=0, number=1, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='edges', full_name='AllDeltas.edges', index=1, + name='family_proxies', full_name='AllDeltas.family_proxies', index=1, number=2, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='families', full_name='AllDeltas.families', index=2, + name='jobs', full_name='AllDeltas.jobs', index=2, number=3, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='family_proxies', full_name='AllDeltas.family_proxies', index=3, + name='tasks', full_name='AllDeltas.tasks', index=3, number=4, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='jobs', full_name='AllDeltas.jobs', index=4, + name='task_proxies', full_name='AllDeltas.task_proxies', index=4, number=5, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='tasks', full_name='AllDeltas.tasks', index=5, + name='edges', full_name='AllDeltas.edges', index=5, number=6, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='task_proxies', full_name='AllDeltas.task_proxies', index=6, + name='workflow', full_name='AllDeltas.workflow', index=6, number=7, type=11, cpp_type=10, label=1, has_default_value=False, default_value=None, message_type=None, enum_type=None, containing_type=None, @@ -1890,8 +1984,8 @@ extension_ranges=[], oneofs=[ ], - serialized_start=3956, - serialized_end=4168, + serialized_start=4223, + serialized_end=4432, ) _PBWORKFLOW_STATETOTALSENTRY.containing_type = _PBWORKFLOW @@ -1911,19 +2005,27 @@ _PBENTIREWORKFLOW.fields_by_name['families'].message_type = _PBFAMILY _PBENTIREWORKFLOW.fields_by_name['family_proxies'].message_type = _PBFAMILYPROXY _PBENTIREWORKFLOW.fields_by_name['edges'].message_type = _PBEDGE -_EDELTAS.fields_by_name['deltas'].message_type = _PBEDGE -_FDELTAS.fields_by_name['deltas'].message_type = _PBFAMILY -_FPDELTAS.fields_by_name['deltas'].message_type = _PBFAMILYPROXY -_JDELTAS.fields_by_name['deltas'].message_type = _PBJOB -_TDELTAS.fields_by_name['deltas'].message_type = _PBTASK -_TPDELTAS.fields_by_name['deltas'].message_type = _PBTASKPROXY -_ALLDELTAS.fields_by_name['workflow'].message_type = _PBWORKFLOW -_ALLDELTAS.fields_by_name['edges'].message_type = _EDELTAS +_EDELTAS.fields_by_name['added'].message_type = _PBEDGE +_EDELTAS.fields_by_name['updated'].message_type = _PBEDGE +_FDELTAS.fields_by_name['added'].message_type = _PBFAMILY +_FDELTAS.fields_by_name['updated'].message_type = _PBFAMILY +_FPDELTAS.fields_by_name['added'].message_type = _PBFAMILYPROXY +_FPDELTAS.fields_by_name['updated'].message_type = _PBFAMILYPROXY +_JDELTAS.fields_by_name['added'].message_type = _PBJOB +_JDELTAS.fields_by_name['updated'].message_type = _PBJOB +_TDELTAS.fields_by_name['added'].message_type = _PBTASK +_TDELTAS.fields_by_name['updated'].message_type = _PBTASK +_TPDELTAS.fields_by_name['added'].message_type = _PBTASKPROXY +_TPDELTAS.fields_by_name['updated'].message_type = _PBTASKPROXY +_WDELTAS.fields_by_name['added'].message_type = _PBWORKFLOW +_WDELTAS.fields_by_name['updated'].message_type = _PBWORKFLOW _ALLDELTAS.fields_by_name['families'].message_type = _FDELTAS _ALLDELTAS.fields_by_name['family_proxies'].message_type = _FPDELTAS _ALLDELTAS.fields_by_name['jobs'].message_type = _JDELTAS _ALLDELTAS.fields_by_name['tasks'].message_type = _TDELTAS _ALLDELTAS.fields_by_name['task_proxies'].message_type = _TPDELTAS +_ALLDELTAS.fields_by_name['edges'].message_type = _EDELTAS +_ALLDELTAS.fields_by_name['workflow'].message_type = _WDELTAS DESCRIPTOR.message_types_by_name['PbMeta'] = _PBMETA DESCRIPTOR.message_types_by_name['PbTimeZone'] = _PBTIMEZONE DESCRIPTOR.message_types_by_name['PbWorkflow'] = _PBWORKFLOW @@ -1944,6 +2046,7 @@ DESCRIPTOR.message_types_by_name['JDeltas'] = _JDELTAS DESCRIPTOR.message_types_by_name['TDeltas'] = _TDELTAS DESCRIPTOR.message_types_by_name['TPDeltas'] = _TPDELTAS +DESCRIPTOR.message_types_by_name['WDeltas'] = _WDELTAS DESCRIPTOR.message_types_by_name['AllDeltas'] = _ALLDELTAS _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -2095,6 +2198,13 @@ }) _sym_db.RegisterMessage(TPDeltas) +WDeltas = _reflection.GeneratedProtocolMessageType('WDeltas', (_message.Message,), { + 'DESCRIPTOR' : _WDELTAS, + '__module__' : 'data_messages_pb2' + # @@protoc_insertion_point(class_scope:WDeltas) + }) +_sym_db.RegisterMessage(WDeltas) + AllDeltas = _reflection.GeneratedProtocolMessageType('AllDeltas', (_message.Message,), { 'DESCRIPTOR' : _ALLDELTAS, '__module__' : 'data_messages_pb2' diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index a28f7554ea8..c352e9a2e4c 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -58,9 +58,9 @@ from cylc.flow import __version__ as CYLC_VERSION from cylc.flow.cycling.loader import get_point from cylc.flow.data_messages_pb2 import ( - PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, - PbJob, PbTask, PbTaskProxy, PbWorkflow, - AllDeltas, EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas) + PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask, + PbTaskProxy, PbWorkflow, AllDeltas, EDeltas, FDeltas, FPDeltas, + JDeltas, TDeltas, TPDeltas, WDeltas) from cylc.flow.network import API from cylc.flow.suite_status import get_suite_status from cylc.flow.task_id import TaskID @@ -90,6 +90,16 @@ WORKFLOW: PbWorkflow, } +DATA_TEMPLATE = { + EDGES: {}, + FAMILIES: {}, + FAMILY_PROXIES: {}, + JOBS: {}, + TASKS: {}, + TASK_PROXIES: {}, + WORKFLOW: PbWorkflow(), +} + DELTAS_MAP = { EDGES: EDeltas, FAMILIES: FDeltas, @@ -97,7 +107,7 @@ JOBS: JDeltas, TASKS: TDeltas, TASK_PROXIES: TPDeltas, - WORKFLOW: PbWorkflow, + WORKFLOW: WDeltas, ALL_DELTAS: AllDeltas, } @@ -132,44 +142,49 @@ def task_mean_elapsed_time(tdef): def apply_delta(key, delta, data): """Apply delta to specific data-store workflow and type.""" + # Assimilate new data + if getattr(delta, 'added', False): + if key != WORKFLOW: + data[key].update({e.id: e for e in delta.added}) + elif delta.added.ListFields(): + data[key].CopyFrom(delta.added) # Merge in updated fields - if key == WORKFLOW: - # Clear fields the require overwrite with delta - for field in CLEAR_FIELD_MAP[key]: - data[key].ClearField(field) - data[key].MergeFrom(delta) - # fields that are set to empty kinds aren't carried - if not delta.is_held_total: - data[key].is_held_total = 0 - if not delta.reloaded: - data[key].reloaded = False - return - for element in delta.deltas: - if element.id not in data[key]: - data[key][element.id] = MESSAGE_MAP[key]() - else: + if getattr(delta, 'updated', False): + if key == WORKFLOW: + for field in CLEAR_FIELD_MAP[key]: + data[key].ClearField(field) # Clear fields the require overwrite with delta - for field, _ in element.ListFields(): - if field.name in CLEAR_FIELD_MAP[key]: - data[key][element.id].ClearField(field.name) - # fields that are set to empty kinds aren't carried - if key in (TASK_PROXIES, FAMILY_PROXIES): - if not element.is_held: - data[key][element.id].is_held = False - data[key][element.id].MergeFrom(element) - # Prune data elements by id - for del_id in delta.pruned: - if del_id not in data[key]: - continue - if key == TASK_PROXIES: - data[TASKS][data[key][del_id].task].proxies.remove(del_id) - getattr(data[WORKFLOW], key).remove(del_id) - elif key == FAMILY_PROXIES: - data[FAMILIES][data[key][del_id].family].proxies.remove(del_id) - getattr(data[WORKFLOW], key).remove(del_id) - elif key == EDGES: - getattr(data[WORKFLOW], key).edges.remove(del_id) - del data[key][del_id] + data[key].MergeFrom(delta.updated) + # Fields that are set to empty kinds aren't carried + if not delta.updated.is_held_total: + data[key].is_held_total = 0 + if not delta.updated.reloaded: + data[key].reloaded = False + else: + for element in delta.updated: + # Clear fields the require overwrite with delta + for field, _ in element.ListFields(): + if field.name in CLEAR_FIELD_MAP[key]: + data[key][element.id].ClearField(field.name) + # Fields that are set to empty kinds aren't carried + if getattr(element, 'is_held', False): + data[key][element.id].is_held = False + data[key][element.id].MergeFrom(element) + # Prune data elements + if hasattr(delta, 'pruned'): + # Prune data elements by id + for del_id in delta.pruned: + if del_id not in data[key]: + continue + if key == TASK_PROXIES: + data[TASKS][data[key][del_id].task].proxies.remove(del_id) + getattr(data[WORKFLOW], key).remove(del_id) + elif key == FAMILY_PROXIES: + data[FAMILIES][data[key][del_id].family].proxies.remove(del_id) + getattr(data[WORKFLOW], key).remove(del_id) + elif key == EDGES: + getattr(data[WORKFLOW], key).edges.remove(del_id) + del data[key][del_id] class DataStoreMgr: @@ -221,6 +236,7 @@ class DataStoreMgr: # Memory optimization - constrain possible attributes to this list. __slots__ = [ + 'added', 'ancestors', 'cycle_states', 'data', @@ -233,7 +249,7 @@ class DataStoreMgr: 'parents', 'pool_points', 'schd', - 'updates', + 'updated', 'updates_pending', 'workflow_id', ] @@ -251,16 +267,10 @@ def __init__(self, schd): self.cycle_states = {} # Managed data types self.data = { - self.workflow_id: { - EDGES: {}, - FAMILIES: {}, - FAMILY_PROXIES: {}, - JOBS: {}, - TASKS: {}, - TASK_PROXIES: {}, - WORKFLOW: PbWorkflow(), - } + self.workflow_id: deepcopy(DATA_TEMPLATE) } + self.added = deepcopy(DATA_TEMPLATE) + self.updated = deepcopy(DATA_TEMPLATE) self.deltas = { EDGES: EDeltas(), FAMILIES: FDeltas(), @@ -268,15 +278,7 @@ def __init__(self, schd): JOBS: JDeltas(), TASKS: TDeltas(), TASK_PROXIES: TPDeltas(), - WORKFLOW: PbWorkflow(), - } - self.updates = { - EDGES: {}, - FAMILIES: {}, - FAMILY_PROXIES: {}, - JOBS: {}, - TASKS: {}, - TASK_PROXIES: {}, + WORKFLOW: WDeltas(), } self.updates_pending = False self.delta_queues = {self.workflow_id: {}} @@ -299,11 +301,11 @@ def initiate_data_model(self, reloaded=False): # Tidy and reassign task jobs after reload if reloaded: - new_tasks = set(self.updates[TASK_PROXIES]) + new_tasks = set(self.added[TASK_PROXIES]) job_tasks = set(self.schd.job_pool.pool) for tp_id in job_tasks.difference(new_tasks): self.schd.job_pool.remove_task_jobs(tp_id) - for tp_id, tp_delta in self.updates[TASK_PROXIES].items(): + for tp_id, tp_delta in self.added[TASK_PROXIES].items(): tp_delta.jobs[:] = [ j_id for j_id in self.schd.job_pool.task_jobs.get(tp_id, []) @@ -329,9 +331,9 @@ def generate_definition_elements(self): """ config = self.schd.config update_time = time() - tasks = self.updates[TASKS] - families = self.updates[FAMILIES] - workflow = self.deltas[WORKFLOW] + tasks = self.added[TASKS] + families = self.added[FAMILIES] + workflow = self.added[WORKFLOW] workflow.id = self.workflow_id workflow.last_updated = update_time @@ -442,48 +444,58 @@ def generate_definition_elements(self): self.descendants = descendants self.parents = parents - def generate_ghost_task(self, task_id): + def generate_ghost_task(self, t_id, tp_id, point_string): """Create task-point element populated with static data. Args: - task_id (str): - valid TaskID string. + t_id (str): data-store task ID. + tp_id (str): data-store task proxy ID. + point_string (str): Valid cycle point string. Returns: - object: cylc.flow.data_messages_pb2.PbTaskProxy - Populated task proxy data element. + None """ - update_time = time() + task_proxies = self.data[self.workflow_id][TASK_PROXIES] + if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]: + return - name, point_string = TaskID.split(task_id) - self.cycle_states.setdefault(point_string, {})[name] = (None, False) - t_id = f'{self.workflow_id}{ID_DELIM}{name}' - tp_id = f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}' - tp_stamp = f'{tp_id}@{update_time}' taskdef = self.data[self.workflow_id][TASKS].get( - t_id, - self.updates[TASKS].get(t_id, MESSAGE_MAP[TASKS]) - ) + t_id, self.added[TASKS].get(t_id)) + + self.cycle_states.setdefault(point_string, {})[taskdef.name] = ( + None, False) + + update_time = time() + tp_stamp = f'{tp_id}@{update_time}' tproxy = PbTaskProxy( stamp=tp_stamp, id=tp_id, - task=taskdef.id, + task=t_id, cycle_point=point_string, depth=taskdef.depth, - name=name, + name=taskdef.name, ) tproxy.namespace[:] = taskdef.namespace tproxy.parents[:] = [ f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{p_name}' - for p_name in self.parents[name]] + for p_name in self.parents[taskdef.name]] tproxy.ancestors[:] = [ f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{a_name}' - for a_name in self.ancestors[name] - if a_name != name] + for a_name in self.ancestors[taskdef.name] + if a_name != taskdef.name] tproxy.first_parent = tproxy.ancestors[0] - return tproxy + + self.added[TASK_PROXIES][tp_id] = tproxy + getattr(self.updated[WORKFLOW], TASK_PROXIES).append(tp_id) + self.updated[TASKS].setdefault( + t_id, + PbTask( + stamp=f'{t_id}@{update_time}', + id=t_id, + ) + ).proxies.append(tp_id) def generate_ghost_families(self, cycle_points=None): """Generate the family-point elements from tasks in cycle points. @@ -493,14 +505,14 @@ def generate_ghost_families(self, cycle_points=None): a set of cycle points. Returns: - list: [cylc.flow.data_messages_pb2.PbFamilyProxy] - list of populated family proxy data elements. + + None """ update_time = time() families = self.data[self.workflow_id][FAMILIES] if not families: - families = self.updates[FAMILIES] + families = self.added[FAMILIES] family_proxies = self.data[self.workflow_id][FAMILY_PROXIES] for point_string, tasks in self.cycle_states.items(): # construct family tree based on the @@ -521,7 +533,7 @@ def generate_ghost_families(self, cycle_points=None): f'{self.workflow_id}{ID_DELIM}' f'{point_string}{ID_DELIM}{fam}') if (fp_id in family_proxies or - fp_id in self.updates[FAMILY_PROXIES]): + fp_id in self.added[FAMILY_PROXIES]): continue fp_delta = PbFamilyProxy( stamp=f'{fp_id}@{update_time}', @@ -553,18 +565,18 @@ def generate_ghost_families(self, cycle_points=None): fp_delta.child_families.append(ch_id) elif child_name in self.schd.config.taskdefs: fp_delta.child_tasks.append(ch_id) - self.updates[FAMILY_PROXIES][fp_id] = fp_delta + self.added[FAMILY_PROXIES][fp_id] = fp_delta # Add ref ID to family element f_delta = PbFamily( id=f_id, stamp=f'{f_id}@{update_time}') f_delta.proxies.append(fp_id) - self.updates[FAMILIES].setdefault( + self.updated[FAMILIES].setdefault( f_id, PbFamily(id=f_id)).MergeFrom(f_delta) # Add ref ID to workflow element - getattr(self.deltas[WORKFLOW], FAMILY_PROXIES).append(fp_id) + getattr(self.updated[WORKFLOW], FAMILY_PROXIES).append(fp_id) def generate_graph_elements(self, start_point=None, stop_point=None): """Generate edges and [ghost] nodes (family and task proxy elements). @@ -579,10 +591,6 @@ def generate_graph_elements(self, start_point=None, stop_point=None): if not self.pool_points: return config = self.schd.config - tasks = self.data[self.workflow_id][TASKS] - if not tasks: - tasks = self.updates[TASKS] - task_proxies = self.data[self.workflow_id][TASK_PROXIES] if start_point is None: start_point = min(self.pool_points) if stop_point is None: @@ -590,6 +598,8 @@ def generate_graph_elements(self, start_point=None, stop_point=None): # Used for generating family [ghost] nodes new_points = set() + # Reference set for workflow relations + new_edges = set() # Generate ungrouped edges for edge in config.get_graph_edges(start_point, stop_point): @@ -612,36 +622,27 @@ def generate_graph_elements(self, start_point=None, stop_point=None): t_name, t_point = TaskID.split(t_node) t_point_cls = get_point(t_point) t_pool_point = get_point(t_point) in self.pool_points + # Proceed if either source or target cycle points # are in the task pool. if not s_pool_point and not t_pool_point: continue + # If source/target is valid add/create the corresponding items. # TODO: if xtrigger is suite_state create remote ID source_id = ( f'{self.workflow_id}{ID_DELIM}{s_point}{ID_DELIM}{s_name}') + + # Add valid source before checking for no target, + # as source may be an isolate (hence no edges). if s_valid: s_task_id = f'{self.workflow_id}{ID_DELIM}{s_name}' new_points.add(s_point) # Add source points for pruning. self.edge_points.setdefault(s_point_cls, set()) - if (source_id not in task_proxies and - source_id not in self.updates[TASK_PROXIES]): - self.updates[TASK_PROXIES][source_id] = ( - self.generate_ghost_task(s_node)) - getattr( - self.deltas[WORKFLOW], TASK_PROXIES).append(source_id) - if (source_id not in tasks[s_task_id].proxies and - source_id not in self.updates[TASKS].get( - s_task_id, PbTask()).proxies): - self.updates[TASKS].setdefault( - s_task_id, - PbTask( - stamp='f{s_task_id}@{update_time}', - id=s_task_id, - )).proxies.append(source_id) - # Add valid source before checking for no target, - # as source may be an isolate (hence no edges). + self.generate_ghost_task(s_task_id, source_id, s_point) + # If target is valid then created it. + # Edges are only created for valid targets. # At present targets can't be xtriggers. if t_valid: target_id = ( @@ -651,47 +652,33 @@ def generate_graph_elements(self, start_point=None, stop_point=None): # Add target points to associated source points for pruning. self.edge_points.setdefault(s_point_cls, set()) self.edge_points[s_point_cls].add(t_point_cls) - if (target_id not in task_proxies and - target_id not in self.updates[TASK_PROXIES]): - self.updates[TASK_PROXIES][target_id] = ( - self.generate_ghost_task(t_node)) - getattr(self.deltas[WORKFLOW], TASK_PROXIES).append( - target_id) - if (target_id not in tasks[t_task_id].proxies and - target_id not in self.updates[TASKS].get( - t_task_id, PbTask()).proxies): - self.updates[TASKS].setdefault( - t_task_id, - PbTask( - stamp='f{t_task_id}@{update_time}', - id=t_task_id, - )).proxies.append(target_id) + self.generate_ghost_task(t_task_id, target_id, t_point) # Initiate edge element. e_id = ( f'{self.workflow_id}{ID_DELIM}{s_node}{ID_DELIM}{t_node}') - self.updates[EDGES][e_id] = PbEdge( + self.added[EDGES][e_id] = PbEdge( id=e_id, suicide=edge[3], cond=edge[4], source=source_id, target=target_id, ) + new_edges.add(e_id) # Add edge id to node field for resolver reference - self.updates[TASK_PROXIES].setdefault( + self.updated[TASK_PROXIES].setdefault( target_id, PbTaskProxy(id=target_id)).edges.append(e_id) if s_valid: - self.updates[TASK_PROXIES].setdefault( + self.updated[TASK_PROXIES].setdefault( source_id, PbTaskProxy(id=source_id)).edges.append(e_id) - getattr( - self.deltas.setdefault(WORKFLOW, PbWorkflow()), - EDGES).edges.extend(self.updates[EDGES].keys()) if new_points: self.generate_ghost_families(new_points) + if new_edges: + getattr(self.updated[WORKFLOW], EDGES).edges.extend(new_edges) def update_data_structure(self, updated_nodes=None): """Reflect workflow changes in the data structure.""" @@ -819,7 +806,7 @@ def update_task_proxies(self, updated_tasks=None): tp_id = ( f'{self.workflow_id}{ID_DELIM}{point_string}{ID_DELIM}{name}') if (tp_id not in task_proxies and - tp_id not in self.updates[TASK_PROXIES]): + tp_id not in self.added[TASK_PROXIES]): continue self.cycle_states.setdefault(point_string, {})[name] = ( itask.state.status, itask.state.is_held) @@ -827,7 +814,7 @@ def update_task_proxies(self, updated_tasks=None): if name not in task_defs: task_defs[name] = itask.tdef # Create new message and copy existing message content. - tp_delta = self.updates[TASK_PROXIES].setdefault( + tp_delta = self.updated[TASK_PROXIES].setdefault( tp_id, PbTaskProxy(id=tp_id)) tp_delta.stamp = f'{tp_id}@{update_time}' tp_delta.state = itask.state.status @@ -865,7 +852,7 @@ def update_task_proxies(self, updated_tasks=None): stamp=f'{t_id}@{update_time}', mean_elapsed_time=elapsed_time ) - self.updates[TASKS].setdefault( + self.updated[TASKS].setdefault( t_id, PbTask(id=t_id)).MergeFrom(t_delta) tasks[t_id].MergeFrom(t_delta) @@ -916,7 +903,7 @@ def update_family_proxies(self, cycle_points=None): f'{point_string}{ID_DELIM}{fam}') if state is None or ( fp_id not in family_proxies and - fp_id not in self.updates[FAMILY_PROXIES]): + fp_id not in self.added[FAMILY_PROXIES]): continue # Since two fields strings are reassigned, # it should be safe without copy. @@ -926,14 +913,15 @@ def update_family_proxies(self, cycle_points=None): state=state, is_held=c_fam_task_is_held[fam] ) - self.updates[FAMILY_PROXIES].setdefault( + self.updated[FAMILY_PROXIES].setdefault( fp_id, PbFamilyProxy()).MergeFrom(fp_delta) def update_workflow(self): """Update workflow element status and state totals.""" # Create new message and copy existing message content update_time = time() - workflow = self.deltas[WORKFLOW] + workflow = self.updated[WORKFLOW] + workflow.id = self.workflow_id workflow.last_updated = update_time data = self.data[self.workflow_id] @@ -943,9 +931,9 @@ def update_workflow(self): counter = Counter( [t.state for t in data[TASK_PROXIES].values() - if t.state and t.id not in self.updates[TASK_PROXIES]] + + if t.state and t.id not in self.updated[TASK_PROXIES]] + [t.state - for t in self.updates[TASK_PROXIES].values() + for t in self.updated[TASK_PROXIES].values() if t.state] ) @@ -956,9 +944,9 @@ def update_workflow(self): workflow.is_held_total = len( [t.is_held for t in data[TASK_PROXIES].values() - if t.is_held and t.id not in self.updates[TASK_PROXIES]] + + if t.is_held and t.id not in self.updated[TASK_PROXIES]] + [t.is_held - for t in self.updates[TASK_PROXIES].values() + for t in self.updated[TASK_PROXIES].values() if t.is_held] ) @@ -990,42 +978,61 @@ def update_dynamic_elements(self, updated_nodes=None): def clear_deltas(self): """Clear current deltas.""" for key in self.deltas: - if key in self.deltas: - self.deltas[key].Clear() - if key in self.updates: - self.updates[key].clear() + self.deltas[key].Clear() + if key == WORKFLOW: + self.added[key].Clear() + self.updated[key].Clear() + continue + self.added[key].clear() + self.updated[key].clear() def apply_deltas(self, reloaded=False): """Gather and apply deltas.""" # Copy in job deltas self.deltas[JOBS].CopyFrom(self.schd.job_pool.deltas) - self.updates[JOBS] = deepcopy(self.schd.job_pool.updates) - - # Gather cumulative update elements - for key, elements in self.updates.items(): - self.deltas[key].deltas.extend(elements.values()) + self.added[JOBS] = deepcopy(self.schd.job_pool.added) + self.updated[JOBS] = deepcopy(self.schd.job_pool.updated) + + # Gather cumulative update element + for key, elements in self.added.items(): + if elements: + if key == WORKFLOW: + self.deltas[WORKFLOW].added.CopyFrom(elements) + continue + self.deltas[key].added.extend(elements.values()) + for key, elements in self.updated.items(): + if elements: + if key == WORKFLOW: + self.deltas[WORKFLOW].updated.CopyFrom(elements) + continue + self.deltas[key].updated.extend(elements.values()) # Apply deltas to local data-store data = self.data[self.workflow_id] for key, delta in self.deltas.items(): - delta.reloaded = reloaded - apply_delta(key, delta, data) + if delta.ListFields(): + delta.reloaded = reloaded + apply_delta(key, delta, data) # Construct checksum on deltas for export update_time = time() for key, delta in self.deltas.items(): - if delta.ListFields() and hasattr(delta, 'checksum'): + if delta.ListFields(): delta.time = update_time - if key == EDGES: - s_att = 'id' - else: - s_att = 'stamp' - delta.checksum = generate_checksum( - [getattr(e, s_att) for e in data[key].values()]) + if hasattr(delta, 'checksum'): + if key == EDGES: + s_att = 'id' + else: + s_att = 'stamp' + delta.checksum = generate_checksum( + [getattr(e, s_att) + for e in data[key].values()] + ) # Clear job pool changes after their application self.schd.job_pool.deltas.Clear() - self.schd.job_pool.updates.clear() + self.schd.job_pool.added.clear() + self.schd.job_pool.updated.clear() # Message collation and dissemination methods: def get_entire_workflow(self): @@ -1058,7 +1065,6 @@ def get_publish_deltas(self): result.append( (key.encode('utf-8'), delta, 'SerializeToString')) getattr(all_deltas, key).CopyFrom(delta) - all_deltas.workflow.id = self.workflow_id result.append( (ALL_DELTAS.encode('utf-8'), all_deltas, 'SerializeToString') ) @@ -1080,9 +1086,9 @@ def get_data_elements(self, element_type): return DELTAS_MAP[WORKFLOW]() data = self.data[self.workflow_id] pb_msg = DELTAS_MAP[element_type]() + pb_msg.time = data[WORKFLOW].last_updated if element_type == WORKFLOW: - pb_msg.CopyFrom(data[WORKFLOW]) + pb_msg.added.CopyFrom(data[WORKFLOW]) else: - pb_msg.time = data[WORKFLOW].last_updated - pb_msg.deltas.extend(data[element_type].values()) + pb_msg.added.extend(data[element_type].values()) return pb_msg diff --git a/cylc/flow/job_pool.py b/cylc/flow/job_pool.py index 6112b7f6306..dd7593e1bb0 100644 --- a/cylc/flow/job_pool.py +++ b/cylc/flow/job_pool.py @@ -55,7 +55,8 @@ def __init__(self, suite, owner): self.pool = {} self.task_jobs = {} self.deltas = JDeltas() - self.updates = {} + self.added = {} + self.updated = {} self.updates_pending = False def insert_job(self, job_conf): @@ -104,7 +105,7 @@ def insert_job(self, job_conf): [f'{key}={val}' for key, val in job_conf['param_var'].items()]) j_buf.extra_logs.extend(job_conf['logfiles']) - self.updates[j_id] = j_buf + self.added[j_id] = j_buf self.task_jobs.setdefault(t_id, set()).add(j_id) self.updates_pending = True @@ -118,16 +119,16 @@ def add_job_msg(self, job_d, msg): try: j_delta = PbJob(stamp=f'{j_id}@{update_time}') j_delta.messages.append(msg) - self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) + self.updated.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) self.updates_pending = True except TypeError as exc: LOG.error(f'Unable to append to {j_id} message field: {str(exc)}') def reload_deltas(self): """Gather all current jobs as deltas after reload.""" - self.updates = deepcopy(self.pool) + self.added = deepcopy(self.pool) self.pool = {} - if self.updates: + if self.added: self.updates_pending = True def remove_job(self, job_d): @@ -162,7 +163,7 @@ def set_job_attr(self, job_d, attr_key, attr_val): try: j_delta = PbJob(stamp=f'{j_id}@{update_time}') setattr(j_delta, attr_key, attr_val) - self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) + self.updated.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) self.updates_pending = True except (TypeError, AttributeError) as exc: LOG.error(f'Unable to set {j_id} data field: {str(exc)}') @@ -179,7 +180,7 @@ def set_job_state(self, job_d, status): stamp=f'{j_id}@{update_time}', state=status ) - self.updates.setdefault( + self.updated.setdefault( j_id, PbJob(id=j_id)).MergeFrom(j_delta) self.updates_pending = True else: @@ -199,7 +200,7 @@ def set_job_time(self, job_d, event_key, time_str=None): j_delta = PbJob(stamp=f'{j_id}@{update_time}') time_attr = f'{event_key}_time' setattr(j_delta, time_attr, time_str) - self.updates.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) + self.updated.setdefault(j_id, PbJob(id=j_id)).MergeFrom(j_delta) self.updates_pending = True except (TypeError, AttributeError) as exc: LOG.error(f'Unable to set {j_id} {time_attr} field: {str(exc)}') diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index dedf69f4bb7..dd56b165912 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -31,7 +31,7 @@ ID_DELIM, EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW ) from cylc.flow.network.schema import ( - NodesEdges, PROXY_NODES, Deltas, WorkflowDeltas, Pruned + NodesEdges, PROXY_NODES, Deltas, Pruned, AddedUpdated ) logger = logging.getLogger(__name__) @@ -176,6 +176,7 @@ class BaseResolvers: def __init__(self, data_store_mgr): self.data_store_mgr = data_store_mgr + self.delta_store = {} # Query resolvers async def get_workflows_data(self, args): @@ -305,22 +306,29 @@ async def subscribe_delta(self, args): yielded GraphQL subscription objects. """ - w_id = args['id'] + w_ids = args['ids'] sub_id = uuid4() delta_queues = self.data_store_mgr.delta_queues - if delta_queues.get(w_id) is not None: - delta_queues[w_id][sub_id] = queue.Queue() + deltas_queue = queue.Queue() + for w_id in w_ids: + if delta_queues.get(w_id) is not None: + delta_queues[w_id][sub_id] = deltas_queue + sub_fields = {'added', 'updated', 'pruned'} try: while True: - if not delta_queues.get(w_id, {}).get(sub_id): + if not any( + delta_queues.get(w_id, {}).get(sub_id) + for w_id in w_ids): break try: - _, topic, delta = delta_queues[w_id][sub_id].get(False) + w_id, topic, delta = deltas_queue.get(False) except queue.Empty: await asyncio.sleep(DELTA_SLEEP_INTERVAL) continue result = Deltas( - workflow=WorkflowDeltas(id=w_id), + id=w_id, + added=AddedUpdated(), + updated=AddedUpdated(), pruned=Pruned() ) if topic == 'shutdown': @@ -328,14 +336,14 @@ async def subscribe_delta(self, args): elif not args.get('topic') or args['topic'] != topic: continue else: - for field, value in delta.workflow.ListFields(): - setattr(result.workflow, field.name, value) - for field, sub_value in delta.ListFields(): - if field.name != 'workflow': - setattr( - result.workflow, field.name, sub_value.deltas) - setattr( - result.pruned, field.name, sub_value.pruned) + for field, value in delta.ListFields(): + for sub_field, sub_value in value.ListFields(): + if sub_field.name in sub_fields: + setattr( + getattr(result, sub_field.name), + field.name, + sub_value + ) yield result except CancelledError: pass @@ -343,8 +351,9 @@ async def subscribe_delta(self, args): import traceback logger.warn(traceback.format_exc()) finally: - if delta_queues.get(w_id, {}).get(sub_id): - del delta_queues[w_id][sub_id] + for w_id in w_ids: + if delta_queues.get(w_id, {}).get(sub_id): + del delta_queues[w_id][sub_id] yield None diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 191c0496741..468c261ec91 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1579,7 +1579,7 @@ class Mutations(ObjectType): # ** Subscription Related ** # delta_args = dict( - id=ID(required=True, description="Full ID, i.e. `owner|name`"), + ids=List(ID, description="List of full ID, i.e. `owner|name`"), topic=String(default_value=ALL_DELTAS), ) @@ -1729,16 +1729,54 @@ class Meta: edges = List(String) -class Deltas(ObjectType): +class AddedUpdated(ObjectType): class Meta: - description = """Grouped deltas of the WFS publish""" + description = """WFS Nodes/Edges that have been removed.""" + families = List( + FamilyDelta, + description="""Family definitions.""", + args=def_args) + family_proxies = List( + FamilyProxyDelta, + description="""Family cycle instances.""", + args=proxy_args) + jobs = List( + JobDelta, + description="""Task jobs.""", + args=jobs_args) + tasks = List( + TaskDelta, + description="""Task definitions.""", + args=def_args) + task_proxies = List( + TaskProxyDelta, + description="""Task cycle instances.""", + args=proxy_args) + edges = List( + EdgeDelta, + description="""Graph edges""", + args=edge_args) workflow = Field( WorkflowDeltas, description=WorkflowDeltas._meta.description, ) + + +class Deltas(ObjectType): + class Meta: + description = """Grouped deltas of the WFS publish""" + id = ID(required=True) + added = Field( + AddedUpdated, + description=AddedUpdated._meta.description + ) + updated = Field( + AddedUpdated, + description=AddedUpdated._meta.description + ) pruned = Field( Pruned, - description=Pruned._meta.description, + description=Pruned._meta.description ) diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index 4ec2f54069a..222c1eadddd 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -17,10 +17,8 @@ import json import sys -from time import sleep from typing import Iterable, Union -import asyncio import zmq from cylc.flow.network import ZMQSocketBase, get_location @@ -105,4 +103,3 @@ async def subscribe(self, msg_handler, *args, **kwargs): data = json.loads(msg) sys.stdout.write( json.dumps(data, indent=4) + '\n') - await asyncio.sleep(0) diff --git a/cylc/flow/tests/network/test_publisher.py b/cylc/flow/tests/network/test_publisher.py index ca3019c79eb..58884e91d54 100644 --- a/cylc/flow/tests/network/test_publisher.py +++ b/cylc/flow/tests/network/test_publisher.py @@ -78,7 +78,7 @@ def setUp(self) -> None: stopcp=None, check_point=True ) - assert 0 == warnings + assert warnings == 0 self.task_pool.release_runahead_tasks() self.scheduler.data_store_mgr.initiate_data_model() self.workflow_id = self.scheduler.data_store_mgr.workflow_id @@ -109,7 +109,7 @@ def test_publish(self): subscriber.socket.recv_multipart()) delta = DELTAS_MAP[btopic.decode('utf-8')]() delta.ParseFromString(msg) - self.assertEqual(delta.id, self.workflow_id) + self.assertEqual(delta.added.id, self.workflow_id) subscriber.stop() with self.assertLogs(LOG, level='ERROR') as cm: self.publisher.publish(None) diff --git a/cylc/flow/tests/network/test_server.py b/cylc/flow/tests/network/test_server.py index 9bd0cd7c215..8b41d30984f 100644 --- a/cylc/flow/tests/network/test_server.py +++ b/cylc/flow/tests/network/test_server.py @@ -77,7 +77,7 @@ def setUp(self) -> None: stopcp=None, check_point=True ) - assert 0 == warnings + assert warnings == 0 self.task_pool.release_runahead_tasks() self.scheduler.data_store_mgr.initiate_data_model() self.workflow_id = self.scheduler.data_store_mgr.workflow_id @@ -129,7 +129,7 @@ def test_pb_data_elements(self): element_type = 'workflow' data = PB_METHOD_MAP['pb_data_elements'][element_type]() data.ParseFromString(self.server.pb_data_elements(element_type)) - self.assertEqual(data.id, self.workflow_id) + self.assertEqual(data.added.id, self.workflow_id) def test_pb_entire_workflow(self): """Test Protobuf entire workflow endpoint method.""" diff --git a/cylc/flow/tests/network/test_subscriber.py b/cylc/flow/tests/network/test_subscriber.py index d039119ce18..32964febf7a 100644 --- a/cylc/flow/tests/network/test_subscriber.py +++ b/cylc/flow/tests/network/test_subscriber.py @@ -120,7 +120,7 @@ def msg_process(btopic, msg): self.topic, self.data = process_delta_msg(btopic, msg, None) self.subscriber.loop.run_until_complete( self.subscriber.subscribe(msg_process)) - self.assertEqual(self.data.id, self.workflow_id) + self.assertEqual(self.data.added.id, self.workflow_id) if __name__ == '__main__': diff --git a/cylc/flow/tests/test_data_store_mgr.py b/cylc/flow/tests/test_data_store_mgr.py index 4abedf87367..dd4a463028f 100644 --- a/cylc/flow/tests/test_data_store_mgr.py +++ b/cylc/flow/tests/test_data_store_mgr.py @@ -106,15 +106,15 @@ def test_generate_graph_elements(self): def test_get_data_elements(self): """Test method that returns data elements by specified type.""" flow_msg = self.data_store_mgr.get_data_elements(TASK_PROXIES) - self.assertEqual(0, len(flow_msg.deltas)) + self.assertEqual(0, len(flow_msg.added)) self.data_store_mgr.initiate_data_model() flow_msg = self.data_store_mgr.get_data_elements(TASK_PROXIES) self.assertEqual( - len(flow_msg.deltas), + len(flow_msg.added), len(self.data[TASK_PROXIES])) flow_msg = self.data_store_mgr.get_data_elements(WORKFLOW) self.assertEqual( - flow_msg.last_updated, self.data[WORKFLOW].last_updated) + flow_msg.added.last_updated, self.data[WORKFLOW].last_updated) none_msg = self.data_store_mgr.get_data_elements('fraggle') self.assertEqual(0, len(none_msg.ListFields())) diff --git a/cylc/flow/tests/test_job_pool.py b/cylc/flow/tests/test_job_pool.py index 0b1ca62637c..6b1672e2ac0 100644 --- a/cylc/flow/tests/test_job_pool.py +++ b/cylc/flow/tests/test_job_pool.py @@ -61,23 +61,23 @@ def setUp(self) -> None: def test_insert_job(self): """Test method that adds a new job to the pool.""" - self.assertEqual(0, len(self.job_pool.updates)) + self.assertEqual(0, len(self.job_pool.added)) self.job_pool.insert_job(JOB_CONFIG) - self.assertEqual(1, len(self.job_pool.updates)) - self.assertTrue(self.ext_id in self.job_pool.updates) + self.assertEqual(1, len(self.job_pool.added)) + self.assertTrue(self.ext_id in self.job_pool.added) def test_add_job_msg(self): """Test method adding messages to job element.""" self.job_pool.insert_job(JOB_CONFIG) - job = self.job_pool.updates[self.ext_id] - old_stamp = copy(job.stamp) - self.assertEqual(0, len(job.messages)) + job_added = self.job_pool.added[self.ext_id] + self.assertEqual(0, len(job_added.messages)) with self.assertLogs(LOG, level='ERROR') as cm: self.job_pool.add_job_msg(self.int_id, True) self.assertIn(f'Unable to append to {self.ext_id}', cm.output[0]) self.job_pool.add_job_msg(self.int_id, 'The Atomic Age') - self.assertNotEqual(old_stamp, job.stamp) - self.assertEqual(1, len(job.messages)) + job_updated = self.job_pool.updated[self.ext_id] + self.assertNotEqual(job_added.stamp, job_updated.stamp) + self.assertEqual(1, len(job_updated.messages)) def test_remove_job(self): """Test method removing a job from the pool via internal job id.""" @@ -96,34 +96,37 @@ def test_remove_task_jobs(self): self.assertEqual(0, len(pruned)) self.job_pool.remove_task_jobs('NotTaskID') self.assertEqual(0, len(pruned)) - task_id = self.job_pool.updates[self.ext_id].task_proxy + task_id = self.job_pool.added[self.ext_id].task_proxy self.job_pool.remove_task_jobs(task_id) self.assertEqual(1, len(pruned)) def test_set_job_attr(self): """Test method setting job attribute value.""" self.job_pool.insert_job(JOB_CONFIG) - job = self.job_pool.updates[self.ext_id] - old_exit_script = copy(job.exit_script) + job_added = self.job_pool.added[self.ext_id] + original_exit_script = copy(job_added.exit_script) with self.assertLogs(LOG, level='ERROR') as cm: self.job_pool.set_job_attr(self.int_id, 'leave_scripting', 'rm *') self.assertIn(f'Unable to set {self.ext_id}', cm.output[0]) with self.assertLogs(LOG, level='ERROR') as cm: self.job_pool.set_job_attr(self.int_id, 'exit_script', 10.0) self.assertIn(f'Unable to set {self.ext_id}', cm.output[0]) - self.assertEqual(old_exit_script, job.exit_script) + self.assertEqual(original_exit_script, job_added.exit_script) self.job_pool.set_job_attr(self.int_id, 'exit_script', 'rm -v *') - self.assertNotEqual(old_exit_script, job.exit_script) + self.assertNotEqual( + job_added.exit_script, + self.job_pool.updated[self.ext_id].exit_script) def test_set_job_state(self): """Test method setting the job state.""" self.job_pool.insert_job(JOB_CONFIG) - job = self.job_pool.updates[self.ext_id] - old_state = copy(job.state) - self.job_pool.set_job_state(self.int_id, 'waiting') - self.assertEqual(old_state, job.state) + job_added = self.job_pool.added[self.ext_id] + self.job_pool.set_job_state(self.int_id, JOB_STATUSES_ALL[1]) + job_updated = self.job_pool.updated[self.ext_id] + state_two = copy(job_updated.state) + self.assertNotEqual(job_added.state, state_two) self.job_pool.set_job_state(self.int_id, JOB_STATUSES_ALL[-1]) - self.assertNotEqual(old_state, job.state) + self.assertNotEqual(state_two, job_updated.state) with self.assertLogs(LOG, level='ERROR') as cm: self.job_pool.set_job_state(self.int_id, 'sleepy') self.assertIn(f'Unable to set {self.ext_id} state', cm.output[0]) @@ -132,14 +135,15 @@ def test_set_job_time(self): """Test method setting event time.""" event_time = get_current_time_string() self.job_pool.insert_job(JOB_CONFIG) - job = self.job_pool.updates[self.ext_id] - old_time = copy(job.submitted_time) + job_added = self.job_pool.added[self.ext_id] with self.assertLogs(LOG, level='ERROR') as cm: self.job_pool.set_job_time(self.int_id, 'jumped', event_time) self.assertIn(f'Unable to set {self.ext_id} jumped_time', cm.output[0]) - self.assertEqual(old_time, job.submitted_time) self.job_pool.set_job_time(self.int_id, 'submitted', event_time) - self.assertNotEqual(old_time, job.submitted_time) + job_updated = self.job_pool.updated[self.ext_id] + self.assertRaises(ValueError, job_updated.HasField, 'jumped_time') + self.assertNotEqual( + job_added.submitted_time, job_updated.submitted_time) def test_parse_job_item(self): """Test internal id parsing method."""