diff --git a/cylc/flow/data_messages.proto b/cylc/flow/data_messages.proto index b6cc4f6e509..2b51e5da44f 100644 --- a/cylc/flow/data_messages.proto +++ b/cylc/flow/data_messages.proto @@ -282,3 +282,13 @@ message TPDeltas { repeated PbTaskProxy deltas = 4; bool reloaded = 5; } + +message AllDeltas { + PbWorkflow workflow = 1; + EDeltas edges = 2; + FDeltas families = 3; + FPDeltas family_proxies = 4; + JDeltas jobs = 5; + TDeltas tasks = 6; + TPDeltas task_proxies = 7; +} diff --git a/cylc/flow/data_messages_pb2.py b/cylc/flow/data_messages_pb2.py index 28d5fcacea1..f80ccecca48 100644 --- a/cylc/flow/data_messages_pb2.py +++ b/cylc/flow/data_messages_pb2.py @@ -18,7 +18,7 @@ package='', syntax='proto3', serialized_options=None, - serialized_pb=b'\n\x13\x64\x61ta_messages.proto\"O\n\x06PbMeta\x12\r\n\x05title\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0b\n\x03URL\x18\x03 \x01(\t\x12\x14\n\x0cuser_defined\x18\x04 \x03(\t\"[\n\nPbTimeZone\x12\r\n\x05hours\x18\x01 \x01(\x05\x12\x0f\n\x07minutes\x18\x02 \x01(\x05\x12\x14\n\x0cstring_basic\x18\x03 \x01(\t\x12\x17\n\x0fstring_extended\x18\x04 \x01(\t\"\xeb\x05\n\nPbWorkflow\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x0e\n\x06status\x18\x04 \x01(\t\x12\x0c\n\x04host\x18\x05 \x01(\t\x12\x0c\n\x04port\x18\x06 \x01(\x05\x12\r\n\x05owner\x18\x07 \x01(\t\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x17\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdges\x12\x13\n\x0b\x61pi_version\x18\x0b \x01(\x05\x12\x14\n\x0c\x63ylc_version\x18\x0c \x01(\t\x12\x14\n\x0clast_updated\x18\r \x01(\x01\x12\x15\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMeta\x12#\n\x1bnewest_runahead_cycle_point\x18\x0f \x01(\t\x12\x1a\n\x12newest_cycle_point\x18\x10 \x01(\t\x12\x1a\n\x12oldest_cycle_point\x18\x11 \x01(\t\x12\x10\n\x08reloaded\x18\x12 \x01(\x08\x12\x10\n\x08run_mode\x18\x13 \x01(\t\x12\x14\n\x0c\x63ycling_mode\x18\x14 \x01(\t\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x18\n\x10workflow_log_dir\x18\x16 \x01(\t\x12#\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZone\x12\x12\n\ntree_depth\x18\x18 \x01(\x05\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x15\n\rns_defn_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x12\n\nstatus_msg\x18\x1e \x01(\t\x12\x15\n\ris_held_total\x18\x1f \x01(\x05\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\x85\x05\n\x05PbJob\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x12\n\nsubmit_num\x18\x03 \x01(\x05\x12\r\n\x05state\x18\x04 \x01(\t\x12\x12\n\ntask_proxy\x18\x05 \x01(\t\x12\x16\n\x0esubmitted_time\x18\x06 \x01(\t\x12\x14\n\x0cstarted_time\x18\x07 \x01(\t\x12\x15\n\rfinished_time\x18\x08 \x01(\t\x12\x18\n\x10\x62\x61tch_sys_job_id\x18\t \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_name\x18\n \x01(\t\x12\x12\n\nenv_script\x18\x0b \x01(\t\x12\x12\n\nerr_script\x18\x0c \x01(\t\x12\x13\n\x0b\x65xit_script\x18\r \x01(\t\x12\x1c\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02\x12\x0c\n\x04host\x18\x0f \x01(\t\x12\x13\n\x0binit_script\x18\x10 \x01(\t\x12\x13\n\x0bjob_log_dir\x18\x11 \x01(\t\x12\r\n\x05owner\x18\x12 \x01(\t\x12\x13\n\x0bpost_script\x18\x13 \x01(\t\x12\x12\n\npre_script\x18\x14 \x01(\t\x12\x0e\n\x06script\x18\x15 \x01(\t\x12\r\n\x05shell\x18\x16 \x01(\t\x12\x14\n\x0cwork_sub_dir\x18\x17 \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_conf\x18\x18 \x03(\t\x12\x13\n\x0b\x65nvironment\x18\x19 \x03(\t\x12\x12\n\ndirectives\x18\x1a \x03(\t\x12\x16\n\x0eparam_env_tmpl\x18\x1b \x03(\t\x12\x11\n\tparam_var\x18\x1c \x03(\t\x12\x12\n\nextra_logs\x18\x1d \x03(\t\x12\x0c\n\x04name\x18\x1e \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x1f \x01(\t\x12\x10\n\x08messages\x18 \x03(\t\"\x96\x01\n\x06PbTask\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\x19\n\x11mean_elapsed_time\x18\x05 \x01(\x02\x12\r\n\x05\x64\x65pth\x18\x06 \x01(\x05\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\"r\n\nPbPollTask\x12\x13\n\x0blocal_proxy\x18\x01 \x01(\t\x12\x10\n\x08workflow\x18\x02 \x01(\t\x12\x14\n\x0cremote_proxy\x18\x03 \x01(\t\x12\x11\n\treq_state\x18\x04 \x01(\t\x12\x14\n\x0cgraph_string\x18\x05 \x01(\t\"l\n\x0bPbCondition\x12\x12\n\ntask_proxy\x18\x01 \x01(\t\x12\x12\n\nexpr_alias\x18\x02 \x01(\t\x12\x11\n\treq_state\x18\x03 \x01(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\x12\x0f\n\x07message\x18\x05 \x01(\t\"o\n\x0ePbPrerequisite\x12\x12\n\nexpression\x18\x01 \x01(\t\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\"\xfd\x02\n\x0bPbTaskProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04task\x18\x03 \x01(\t\x12\r\n\x05state\x18\x04 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x05 \x01(\t\x12\x0f\n\x07spawned\x18\x06 \x01(\x08\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x13\n\x0bjob_submits\x18\x08 \x01(\x05\x12\x16\n\x0elatest_message\x18\t \x01(\t\x12\x0f\n\x07outputs\x18\n \x03(\t\x12\x12\n\nbroadcasts\x18\x0b \x03(\t\x12\x11\n\tnamespace\x18\x0c \x03(\t\x12&\n\rprerequisites\x18\r \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\x0e \x03(\t\x12\x0f\n\x07parents\x18\x0f \x03(\t\x12\x14\n\x0c\x66irst_parent\x18\x10 \x01(\t\x12\x0c\n\x04name\x18\x11 \x01(\t\x12\x0f\n\x07is_held\x18\x12 \x01(\x08\x12\r\n\x05\x65\x64ges\x18\x13 \x03(\t\x12\x11\n\tancestors\x18\x14 \x03(\t\"\xa8\x01\n\x08PbFamily\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\r\n\x05\x64\x65pth\x18\x05 \x01(\x05\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\"\xf3\x01\n\rPbFamilyProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x03 \x01(\t\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0e\n\x06\x66\x61mily\x18\x05 \x01(\t\x12\r\n\x05state\x18\x06 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x14\n\x0c\x66irst_parent\x18\x08 \x01(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x0f\n\x07is_held\x18\x0c \x01(\x08\x12\x11\n\tancestors\x18\r \x03(\t\"b\n\x06PbEdge\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0e\n\x06source\x18\x03 \x01(\t\x12\x0e\n\x06target\x18\x04 \x01(\t\x12\x0f\n\x07suicide\x18\x05 \x01(\x08\x12\x0c\n\x04\x63ond\x18\x06 \x01(\x08\"o\n\x07PbEdges\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\t\"\xe0\x01\n\x10PbEntireWorkflow\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdge\"d\n\x07\x45\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"f\n\x07\x46\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x19\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"l\n\x08\x46PDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1e\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"c\n\x07JDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x16\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"d\n\x07TDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"j\n\x08TPDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1c\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\x62\x06proto3' + serialized_pb=b'\n\x13\x64\x61ta_messages.proto\"O\n\x06PbMeta\x12\r\n\x05title\x18\x01 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x02 \x01(\t\x12\x0b\n\x03URL\x18\x03 \x01(\t\x12\x14\n\x0cuser_defined\x18\x04 \x03(\t\"[\n\nPbTimeZone\x12\r\n\x05hours\x18\x01 \x01(\x05\x12\x0f\n\x07minutes\x18\x02 \x01(\x05\x12\x14\n\x0cstring_basic\x18\x03 \x01(\t\x12\x17\n\x0fstring_extended\x18\x04 \x01(\t\"\xeb\x05\n\nPbWorkflow\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x0e\n\x06status\x18\x04 \x01(\t\x12\x0c\n\x04host\x18\x05 \x01(\t\x12\x0c\n\x04port\x18\x06 \x01(\x05\x12\r\n\x05owner\x18\x07 \x01(\t\x12\r\n\x05tasks\x18\x08 \x03(\t\x12\x10\n\x08\x66\x61milies\x18\t \x03(\t\x12\x17\n\x05\x65\x64ges\x18\n \x01(\x0b\x32\x08.PbEdges\x12\x13\n\x0b\x61pi_version\x18\x0b \x01(\x05\x12\x14\n\x0c\x63ylc_version\x18\x0c \x01(\t\x12\x14\n\x0clast_updated\x18\r \x01(\x01\x12\x15\n\x04meta\x18\x0e \x01(\x0b\x32\x07.PbMeta\x12#\n\x1bnewest_runahead_cycle_point\x18\x0f \x01(\t\x12\x1a\n\x12newest_cycle_point\x18\x10 \x01(\t\x12\x1a\n\x12oldest_cycle_point\x18\x11 \x01(\t\x12\x10\n\x08reloaded\x18\x12 \x01(\x08\x12\x10\n\x08run_mode\x18\x13 \x01(\t\x12\x14\n\x0c\x63ycling_mode\x18\x14 \x01(\t\x12\x32\n\x0cstate_totals\x18\x15 \x03(\x0b\x32\x1c.PbWorkflow.StateTotalsEntry\x12\x18\n\x10workflow_log_dir\x18\x16 \x01(\t\x12#\n\x0etime_zone_info\x18\x17 \x01(\x0b\x32\x0b.PbTimeZone\x12\x12\n\ntree_depth\x18\x18 \x01(\x05\x12\x15\n\rjob_log_names\x18\x19 \x03(\t\x12\x15\n\rns_defn_order\x18\x1a \x03(\t\x12\x0e\n\x06states\x18\x1b \x03(\t\x12\x14\n\x0ctask_proxies\x18\x1c \x03(\t\x12\x16\n\x0e\x66\x61mily_proxies\x18\x1d \x03(\t\x12\x12\n\nstatus_msg\x18\x1e \x01(\t\x12\x15\n\ris_held_total\x18\x1f \x01(\x05\x1a\x32\n\x10StateTotalsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x05:\x02\x38\x01\"\x85\x05\n\x05PbJob\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x12\n\nsubmit_num\x18\x03 \x01(\x05\x12\r\n\x05state\x18\x04 \x01(\t\x12\x12\n\ntask_proxy\x18\x05 \x01(\t\x12\x16\n\x0esubmitted_time\x18\x06 \x01(\t\x12\x14\n\x0cstarted_time\x18\x07 \x01(\t\x12\x15\n\rfinished_time\x18\x08 \x01(\t\x12\x18\n\x10\x62\x61tch_sys_job_id\x18\t \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_name\x18\n \x01(\t\x12\x12\n\nenv_script\x18\x0b \x01(\t\x12\x12\n\nerr_script\x18\x0c \x01(\t\x12\x13\n\x0b\x65xit_script\x18\r \x01(\t\x12\x1c\n\x14\x65xecution_time_limit\x18\x0e \x01(\x02\x12\x0c\n\x04host\x18\x0f \x01(\t\x12\x13\n\x0binit_script\x18\x10 \x01(\t\x12\x13\n\x0bjob_log_dir\x18\x11 \x01(\t\x12\r\n\x05owner\x18\x12 \x01(\t\x12\x13\n\x0bpost_script\x18\x13 \x01(\t\x12\x12\n\npre_script\x18\x14 \x01(\t\x12\x0e\n\x06script\x18\x15 \x01(\t\x12\r\n\x05shell\x18\x16 \x01(\t\x12\x14\n\x0cwork_sub_dir\x18\x17 \x01(\t\x12\x16\n\x0e\x62\x61tch_sys_conf\x18\x18 \x03(\t\x12\x13\n\x0b\x65nvironment\x18\x19 \x03(\t\x12\x12\n\ndirectives\x18\x1a \x03(\t\x12\x16\n\x0eparam_env_tmpl\x18\x1b \x03(\t\x12\x11\n\tparam_var\x18\x1c \x03(\t\x12\x12\n\nextra_logs\x18\x1d \x03(\t\x12\x0c\n\x04name\x18\x1e \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x1f \x01(\t\x12\x10\n\x08messages\x18 \x03(\t\"\x96\x01\n\x06PbTask\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\x19\n\x11mean_elapsed_time\x18\x05 \x01(\x02\x12\r\n\x05\x64\x65pth\x18\x06 \x01(\x05\x12\x0f\n\x07proxies\x18\x07 \x03(\t\x12\x11\n\tnamespace\x18\x08 \x03(\t\"r\n\nPbPollTask\x12\x13\n\x0blocal_proxy\x18\x01 \x01(\t\x12\x10\n\x08workflow\x18\x02 \x01(\t\x12\x14\n\x0cremote_proxy\x18\x03 \x01(\t\x12\x11\n\treq_state\x18\x04 \x01(\t\x12\x14\n\x0cgraph_string\x18\x05 \x01(\t\"l\n\x0bPbCondition\x12\x12\n\ntask_proxy\x18\x01 \x01(\t\x12\x12\n\nexpr_alias\x18\x02 \x01(\t\x12\x11\n\treq_state\x18\x03 \x01(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\x12\x0f\n\x07message\x18\x05 \x01(\t\"o\n\x0ePbPrerequisite\x12\x12\n\nexpression\x18\x01 \x01(\t\x12 \n\nconditions\x18\x02 \x03(\x0b\x32\x0c.PbCondition\x12\x14\n\x0c\x63ycle_points\x18\x03 \x03(\t\x12\x11\n\tsatisfied\x18\x04 \x01(\x08\"\xfd\x02\n\x0bPbTaskProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04task\x18\x03 \x01(\t\x12\r\n\x05state\x18\x04 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x05 \x01(\t\x12\x0f\n\x07spawned\x18\x06 \x01(\x08\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x13\n\x0bjob_submits\x18\x08 \x01(\x05\x12\x16\n\x0elatest_message\x18\t \x01(\t\x12\x0f\n\x07outputs\x18\n \x03(\t\x12\x12\n\nbroadcasts\x18\x0b \x03(\t\x12\x11\n\tnamespace\x18\x0c \x03(\t\x12&\n\rprerequisites\x18\r \x03(\x0b\x32\x0f.PbPrerequisite\x12\x0c\n\x04jobs\x18\x0e \x03(\t\x12\x0f\n\x07parents\x18\x0f \x03(\t\x12\x14\n\x0c\x66irst_parent\x18\x10 \x01(\t\x12\x0c\n\x04name\x18\x11 \x01(\t\x12\x0f\n\x07is_held\x18\x12 \x01(\x08\x12\r\n\x05\x65\x64ges\x18\x13 \x03(\t\x12\x11\n\tancestors\x18\x14 \x03(\t\"\xa8\x01\n\x08PbFamily\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x15\n\x04meta\x18\x04 \x01(\x0b\x32\x07.PbMeta\x12\r\n\x05\x64\x65pth\x18\x05 \x01(\x05\x12\x0f\n\x07proxies\x18\x06 \x03(\t\x12\x0f\n\x07parents\x18\x07 \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\x08 \x03(\t\x12\x16\n\x0e\x63hild_families\x18\t \x03(\t\"\xf3\x01\n\rPbFamilyProxy\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x13\n\x0b\x63ycle_point\x18\x03 \x01(\t\x12\x0c\n\x04name\x18\x04 \x01(\t\x12\x0e\n\x06\x66\x61mily\x18\x05 \x01(\t\x12\r\n\x05state\x18\x06 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x07 \x01(\x05\x12\x14\n\x0c\x66irst_parent\x18\x08 \x01(\t\x12\x0f\n\x07parents\x18\t \x03(\t\x12\x13\n\x0b\x63hild_tasks\x18\n \x03(\t\x12\x16\n\x0e\x63hild_families\x18\x0b \x03(\t\x12\x0f\n\x07is_held\x18\x0c \x01(\x08\x12\x11\n\tancestors\x18\r \x03(\t\"b\n\x06PbEdge\x12\r\n\x05stamp\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12\x0e\n\x06source\x18\x03 \x01(\t\x12\x0e\n\x06target\x18\x04 \x01(\t\x12\x0f\n\x07suicide\x18\x05 \x01(\x08\x12\x0c\n\x04\x63ond\x18\x06 \x01(\x08\"o\n\x07PbEdges\x12\n\n\x02id\x18\x01 \x01(\t\x12\r\n\x05\x65\x64ges\x18\x02 \x03(\t\x12+\n\x16workflow_polling_tasks\x18\x03 \x03(\x0b\x32\x0b.PbPollTask\x12\x0e\n\x06leaves\x18\x04 \x03(\t\x12\x0c\n\x04\x66\x65\x65t\x18\x05 \x03(\t\"\xe0\x01\n\x10PbEntireWorkflow\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x16\n\x05tasks\x18\x02 \x03(\x0b\x32\x07.PbTask\x12\"\n\x0ctask_proxies\x18\x03 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x14\n\x04jobs\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x1b\n\x08\x66\x61milies\x18\x05 \x03(\x0b\x32\t.PbFamily\x12&\n\x0e\x66\x61mily_proxies\x18\x06 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x16\n\x05\x65\x64ges\x18\x07 \x03(\x0b\x32\x07.PbEdge\"d\n\x07\x45\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbEdge\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"f\n\x07\x46\x44\x65ltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x19\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\t.PbFamily\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"l\n\x08\x46PDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1e\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0e.PbFamilyProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"c\n\x07JDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x16\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x06.PbJob\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"d\n\x07TDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x17\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x07.PbTask\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"j\n\x08TPDeltas\x12\x0c\n\x04time\x18\x01 \x01(\x01\x12\x10\n\x08\x63hecksum\x18\x02 \x01(\x03\x12\x0e\n\x06pruned\x18\x03 \x03(\t\x12\x1c\n\x06\x64\x65ltas\x18\x04 \x03(\x0b\x32\x0c.PbTaskProxy\x12\x10\n\x08reloaded\x18\x05 \x01(\x08\"\xd4\x01\n\tAllDeltas\x12\x1d\n\x08workflow\x18\x01 \x01(\x0b\x32\x0b.PbWorkflow\x12\x17\n\x05\x65\x64ges\x18\x02 \x01(\x0b\x32\x08.EDeltas\x12\x1a\n\x08\x66\x61milies\x18\x03 \x01(\x0b\x32\x08.FDeltas\x12!\n\x0e\x66\x61mily_proxies\x18\x04 \x01(\x0b\x32\t.FPDeltas\x12\x16\n\x04jobs\x18\x05 \x01(\x0b\x32\x08.JDeltas\x12\x17\n\x05tasks\x18\x06 \x01(\x0b\x32\x08.TDeltas\x12\x1f\n\x0ctask_proxies\x18\x07 \x01(\x0b\x32\t.TPDeltasb\x06proto3' ) @@ -1821,6 +1821,79 @@ serialized_end=3953, ) + +_ALLDELTAS = _descriptor.Descriptor( + name='AllDeltas', + full_name='AllDeltas', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='workflow', full_name='AllDeltas.workflow', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='edges', full_name='AllDeltas.edges', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='families', full_name='AllDeltas.families', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='family_proxies', full_name='AllDeltas.family_proxies', index=3, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='jobs', full_name='AllDeltas.jobs', index=4, + number=5, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='tasks', full_name='AllDeltas.tasks', index=5, + number=6, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='task_proxies', full_name='AllDeltas.task_proxies', index=6, + number=7, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=3956, + serialized_end=4168, +) + _PBWORKFLOW_STATETOTALSENTRY.containing_type = _PBWORKFLOW _PBWORKFLOW.fields_by_name['edges'].message_type = _PBEDGES _PBWORKFLOW.fields_by_name['meta'].message_type = _PBMETA @@ -1844,6 +1917,13 @@ _JDELTAS.fields_by_name['deltas'].message_type = _PBJOB _TDELTAS.fields_by_name['deltas'].message_type = _PBTASK _TPDELTAS.fields_by_name['deltas'].message_type = _PBTASKPROXY +_ALLDELTAS.fields_by_name['workflow'].message_type = _PBWORKFLOW +_ALLDELTAS.fields_by_name['edges'].message_type = _EDELTAS +_ALLDELTAS.fields_by_name['families'].message_type = _FDELTAS +_ALLDELTAS.fields_by_name['family_proxies'].message_type = _FPDELTAS +_ALLDELTAS.fields_by_name['jobs'].message_type = _JDELTAS +_ALLDELTAS.fields_by_name['tasks'].message_type = _TDELTAS +_ALLDELTAS.fields_by_name['task_proxies'].message_type = _TPDELTAS DESCRIPTOR.message_types_by_name['PbMeta'] = _PBMETA DESCRIPTOR.message_types_by_name['PbTimeZone'] = _PBTIMEZONE DESCRIPTOR.message_types_by_name['PbWorkflow'] = _PBWORKFLOW @@ -1864,6 +1944,7 @@ DESCRIPTOR.message_types_by_name['JDeltas'] = _JDELTAS DESCRIPTOR.message_types_by_name['TDeltas'] = _TDELTAS DESCRIPTOR.message_types_by_name['TPDeltas'] = _TPDELTAS +DESCRIPTOR.message_types_by_name['AllDeltas'] = _ALLDELTAS _sym_db.RegisterFileDescriptor(DESCRIPTOR) PbMeta = _reflection.GeneratedProtocolMessageType('PbMeta', (_message.Message,), { @@ -2014,6 +2095,13 @@ }) _sym_db.RegisterMessage(TPDeltas) +AllDeltas = _reflection.GeneratedProtocolMessageType('AllDeltas', (_message.Message,), { + 'DESCRIPTOR' : _ALLDELTAS, + '__module__' : 'data_messages_pb2' + # @@protoc_insertion_point(class_scope:AllDeltas) + }) +_sym_db.RegisterMessage(AllDeltas) + _PBWORKFLOW_STATETOTALSENTRY._options = None # @@protoc_insertion_point(module_scope) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 4d536b9bc9f..a28f7554ea8 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -60,7 +60,7 @@ from cylc.flow.data_messages_pb2 import ( PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask, PbTaskProxy, PbWorkflow, - EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas) + AllDeltas, EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas) from cylc.flow.network import API from cylc.flow.suite_status import get_suite_status from cylc.flow.task_id import TaskID @@ -78,6 +78,7 @@ TASKS = 'tasks' TASK_PROXIES = 'task_proxies' WORKFLOW = 'workflow' +ALL_DELTAS = 'all' MESSAGE_MAP = { EDGES: PbEdge, @@ -97,6 +98,7 @@ TASKS: TDeltas, TASK_PROXIES: TPDeltas, WORKFLOW: PbWorkflow, + ALL_DELTAS: AllDeltas, } # Protobuf message merging appends repeated field results on merge, @@ -223,6 +225,7 @@ class DataStoreMgr: 'cycle_states', 'data', 'deltas', + 'delta_queues', 'descendants', 'edge_points', 'max_point', @@ -276,6 +279,7 @@ def __init__(self, schd): TASK_PROXIES: {}, } self.updates_pending = False + self.delta_queues = {self.workflow_id: {}} def initiate_data_model(self, reloaded=False): """Initiate or Update data model on start/restart/reload. @@ -1047,11 +1051,18 @@ def get_entire_workflow(self): def get_publish_deltas(self): """Return deltas for publishing.""" - return [ - (key.encode('utf-8'), delta, 'SerializeToString') - for key, delta in self.deltas.items() - if delta.ListFields() - ] + all_deltas = DELTAS_MAP[ALL_DELTAS]() + result = [] + for key, delta in self.deltas.items(): + if delta.ListFields(): + result.append( + (key.encode('utf-8'), delta, 'SerializeToString')) + getattr(all_deltas, key).CopyFrom(delta) + all_deltas.workflow.id = self.workflow_id + result.append( + (ALL_DELTAS.encode('utf-8'), all_deltas, 'SerializeToString') + ) + return result def get_data_elements(self, element_type): """Get elements of a given type in the form of a delta. diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index b01fbdf3fbd..1d3a380d650 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -43,7 +43,10 @@ def encode_(message): """Convert the structure holding a message field from JSON to a string.""" - return json.dumps(message) + try: + return json.dumps(message) + except TypeError as exc: + return json.dumps({'errors': [{'message': str(exc)}]}) def decode_(message): diff --git a/cylc/flow/network/graphql.py b/cylc/flow/network/graphql.py new file mode 100644 index 00000000000..ba0f54911f9 --- /dev/null +++ b/cylc/flow/network/graphql.py @@ -0,0 +1,155 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +""" + +A modification of th GraphQL Core backend: +https://github.com/graphql-python/graphql-core + +GraphQL Middleware defined here also. + +""" +from functools import partial +import logging + +from graphql.execution import ExecutionResult, execute +from graphql.language.base import parse, print_ast +from graphql.language import ast +from graphql.validation import validate +from graphql.backend.base import GraphQLBackend, GraphQLDocument +from rx import Observable + +logger = logging.getLogger(__name__) + + +# Is possible to not use middleware and do all the filtering here. +# However, middleware allows for argument of the request doc to set. +def strip_null(data): + """Recursively strip data structure of nulls.""" + if isinstance(data, dict): + return { + key: strip_null(val) + for key, val in data.items() + if val is not None + } + if isinstance(data, list): + return [ + strip_null(val) + for val in data + if val is not None + ] + return data + + +def attr_strip_null(result): + """Work on the attribute/data of ExecutionResult if present.""" + if hasattr(result, 'data'): + result.data = strip_null(result.data) + return result + return strip_null(result) + + +def null_stripper(exe_result): + """Strip nulls in accordance with type of execution result.""" + if isinstance(exe_result, Observable): + return exe_result.map(attr_strip_null) + if not exe_result.errors: + return attr_strip_null(exe_result) + return exe_result + + +def execute_and_validate( + schema, # type: GraphQLSchema + document_ast, # type: Document + *args, # type: Any + **kwargs # type: Any +): + # type: (...) -> Union[ExecutionResult, Observable] + """Validate schema, and execute request doc against it.""" + do_validation = kwargs.get("validate", True) + if do_validation: + validation_errors = validate(schema, document_ast) + if validation_errors: + return ExecutionResult(errors=validation_errors, invalid=True) + + result = execute(schema, document_ast, *args, **kwargs) + + if kwargs.get('strip_null', False): + if kwargs.get('return_promise', False): + return result.then(null_stripper) + return null_stripper(result) + return result + + +class GraphQLCoreBackend(GraphQLBackend): + """GraphQLCoreBackend will return a document using the default + graphql executor""" + + def __init__(self, executor=None): + # type: (Optional[Any]) -> None + self.execute_params = {"executor": executor} + + def document_from_string(self, schema, document_string): + # type: (GraphQLSchema, Union[Document, str]) -> GraphQLDocument + """Parse string and setup request docutment for execution.""" + if isinstance(document_string, ast.Document): + document_ast = document_string + document_string = print_ast(document_ast) + else: + if not isinstance(document_string, str): + logger.error("The query must be a string") + document_ast = parse(document_string) + return GraphQLDocument( + schema=schema, + document_string=document_string, + document_ast=document_ast, + execute=partial( + execute_and_validate, + schema, + document_ast, + **self.execute_params + ), + ) + + +# -- Middleware -- + +class IgnoreFieldMiddleware: + """Set to null/None type undesired field values for stripping.""" + + ALLOW_TYPES = (0, 0., False) + + def resolve(self, next, root, info, **args): + """Middleware resolver; handles field according to operation.""" + if getattr(info.operation.name, 'value', None) == 'IntrospectionQuery': + return next(root, info, **args) + if info.operation.operation == 'query': + return self.async_resolve(next, root, info, **args) + if info.operation.operation == 'subscription': + return self.null_setter(next(root, info, **args)) + return next(root, info, **args) + + async def async_resolve(self, next, root, info, **args): + """Set type to null after awaited result if empty/null-like.""" + result = await next(root, info, **args) + return self.null_setter(result) + + def null_setter(self, result): + """Set type to null if result is empty/null-like.""" + # If result is not empty... could be some other condition. + # excluded False, as could be a flag turned off. + if result or result in self.ALLOW_TYPES: + return result + return None diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index e0023c4542f..dedf69f4bb7 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -16,14 +16,27 @@ """GraphQL resolvers for use in data accessing and mutation of workflows.""" +import asyncio +from concurrent.futures._base import CancelledError from fnmatch import fnmatchcase from getpass import getuser +import logging from operator import attrgetter +import queue +from uuid import uuid4 + from graphene.utils.str_converters import to_snake_case from cylc.flow.data_store_mgr import ( - ID_DELIM, EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW) -from cylc.flow.network.schema import NodesEdges, PROXY_NODES + ID_DELIM, EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW +) +from cylc.flow.network.schema import ( + NodesEdges, PROXY_NODES, Deltas, WorkflowDeltas, Pruned +) + +logger = logging.getLogger(__name__) + +DELTA_SLEEP_INTERVAL = 0.5 # Message Filters @@ -51,9 +64,10 @@ def workflow_ids_filter(w_atts, items): return False -def workflow_filter(flow, args): +def workflow_filter(flow, args, w_atts=None): """Filter workflows based on attribute arguments""" - w_atts = collate_workflow_atts(flow[WORKFLOW]) + if w_atts is None: + w_atts = collate_workflow_atts(flow[WORKFLOW]) # The w_atts (workflow attributes) list contains ordered workflow values # or defaults (see collate function for index item). return ((not args.get('workflows') or @@ -160,15 +174,15 @@ def sort_elements(elements, args): class BaseResolvers: """Data access methods for resolving GraphQL queries.""" - def __init__(self, data): - self.data = data + def __init__(self, data_store_mgr): + self.data_store_mgr = data_store_mgr # Query resolvers async def get_workflows_data(self, args): """Return list of data from workflows.""" return [ flow - for flow in self.data.values() + for flow in self.data_store_mgr.data.values() if workflow_filter(flow, args)] async def get_workflows(self, args): @@ -197,7 +211,8 @@ async def get_nodes_by_ids(self, node_type, args): node_types = [node_type] return sort_elements( [node - for flow in get_flow_data_from_ids(self.data, nat_ids) + for flow in get_flow_data_from_ids( + self.data_store_mgr.data, nat_ids) for node_type in node_types for node in get_data_elements(flow, nat_ids, node_type) if node_filter(node, args)], @@ -208,8 +223,8 @@ async def get_node_by_id(self, node_type, args): n_id = args.get('id') o_name, w_name, _ = n_id.split(ID_DELIM, 2) w_id = f'{o_name}{ID_DELIM}{w_name}' - flow = self.data.get(w_id) - if not flow: + flow = self.data_store_mgr.data.get(w_id) + if flow is None: return None if node_type == PROXY_NODES: return ( @@ -232,7 +247,8 @@ async def get_edges_by_ids(self, args): nat_ids = set(args.get('native_ids', [])) return sort_elements( [edge - for flow in get_flow_data_from_ids(self.data, nat_ids) + for flow in get_flow_data_from_ids( + self.data_store_mgr.data, nat_ids) for edge in get_data_elements(flow, nat_ids, EDGES)], args) @@ -256,7 +272,8 @@ async def get_nodes_edges(self, root_nodes, args): edge_ids.update(new_edge_ids) new_edges = [ edge - for flow in get_flow_data_from_ids(self.data, new_edge_ids) + for flow in get_flow_data_from_ids( + self.data_store_mgr.data, new_edge_ids) for edge in get_data_elements(flow, new_edge_ids, EDGES) ] edges += new_edges @@ -271,7 +288,8 @@ async def get_nodes_edges(self, root_nodes, args): node_ids.update(new_node_ids) new_nodes = [ node - for flow in get_flow_data_from_ids(self.data, new_node_ids) + for flow in get_flow_data_from_ids( + self.data_store_mgr.data, new_node_ids) for node in get_data_elements(flow, new_node_ids, TASK_PROXIES) ] nodes += new_nodes @@ -280,6 +298,55 @@ async def get_nodes_edges(self, root_nodes, args): nodes=sort_elements(nodes, args), edges=sort_elements(edges, args)) + async def subscribe_delta(self, args): + """Delta subscription async generator. + + Async generator mapping the incomming protobuf deltas to + yielded GraphQL subscription objects. + + """ + w_id = args['id'] + sub_id = uuid4() + delta_queues = self.data_store_mgr.delta_queues + if delta_queues.get(w_id) is not None: + delta_queues[w_id][sub_id] = queue.Queue() + try: + while True: + if not delta_queues.get(w_id, {}).get(sub_id): + break + try: + _, topic, delta = delta_queues[w_id][sub_id].get(False) + except queue.Empty: + await asyncio.sleep(DELTA_SLEEP_INTERVAL) + continue + result = Deltas( + workflow=WorkflowDeltas(id=w_id), + pruned=Pruned() + ) + if topic == 'shutdown': + result.workflow.status = 'stopped' + elif not args.get('topic') or args['topic'] != topic: + continue + else: + for field, value in delta.workflow.ListFields(): + setattr(result.workflow, field.name, value) + for field, sub_value in delta.ListFields(): + if field.name != 'workflow': + setattr( + result.workflow, field.name, sub_value.deltas) + setattr( + result.pruned, field.name, sub_value.pruned) + yield result + except CancelledError: + pass + except Exception: + import traceback + logger.warn(traceback.format_exc()) + finally: + if delta_queues.get(w_id, {}).get(sub_id): + del delta_queues[w_id][sub_id] + yield None + class Resolvers(BaseResolvers): """Workflow Service context GraphQL query and mutation resolvers.""" diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index e7d748de4e3..191c0496741 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -47,7 +47,7 @@ ) from cylc.flow.data_store_mgr import ( ID_DELIM, FAMILIES, FAMILY_PROXIES, - JOBS, TASKS, TASK_PROXIES + JOBS, TASKS, TASK_PROXIES, ALL_DELTAS ) from cylc.flow.suite_status import StopMode @@ -553,7 +553,7 @@ class Task(ObjectType): class Meta: description = """Task definition, static fields""" id = ID(required=True) - name = String(required=True) + name = String() meta = Field(DefMeta) mean_elapsed_time = Float() depth = Int() @@ -1578,6 +1578,12 @@ class Mutations(ObjectType): # ** Subscription Related ** # +delta_args = dict( + id=ID(required=True, description="Full ID, i.e. `owner|name`"), + topic=String(default_value=ALL_DELTAS), +) + + def to_subscription(func: Callable, sleep_seconds: float = 5.) -> Callable: """Wraps a function in a while-true-sleep, transforming the function into an async-generator, used by the @@ -1598,16 +1604,153 @@ async def gen(*args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]: AsyncGenerator[Any, None]: an async generator that will yield values from resolvers. """ - while True: - yield await func(*args, **kwargs) - await asyncio.sleep(sleep_seconds) + try: + while True: + yield await func(*args, **kwargs) + await asyncio.sleep(sleep_seconds) + finally: + yield None return gen +def delta_subs(root, info, **args) -> AsyncGenerator[Any, None]: + return info.context.get('resolvers').subscribe_delta(args) + + +# The following inheritance is to avoid using the query resolvers +# of the ObjectType fields. +# TODO:- Create resolvers that work with the null-stripping. +class JobDelta(Job): + task_proxy = Field( + lambda: TaskProxyDelta, + description="""Associated Task Proxy""") + + +class TaskDelta(Task): + proxies = List( + lambda: TaskProxyDelta, + description="""Associated cycle point proxies""") + + +class TaskProxyDelta(TaskProxy): + task = Field( + TaskDelta, + description="""Task definition""") + jobs = List( + JobDelta, + description="""Task jobs.""") + parents = List( + lambda: FamilyProxyDelta, + description="""Task parents.""") + first_parent = Field( + lambda: FamilyProxyDelta, + description="""Task first parent.""") + ancestors = List( + lambda: FamilyProxyDelta, + description="""First parent ancestors.""") + + +class FamilyDelta(Family): + proxies = List( + lambda: FamilyProxyDelta, + description="""Associated cycle point proxies""") + parents = List( + lambda: FamilyDelta, + description="""Family definition parent.""") + child_tasks = List( + TaskDelta, + description="""Descendant definition tasks.""") + child_families = List( + lambda: FamilyDelta, + description="""Descendant desc families.""") + + +class FamilyProxyDelta(FamilyProxy): + family = Field( + FamilyDelta, + description="""Family definition""") + parents = List( + lambda: FamilyProxyDelta, + description="""Family parent proxies.""") + child_tasks = List( + TaskProxyDelta, + description="""Descendant task proxies.""") + child_families = List( + lambda: FamilyProxyDelta, + description="""Descendant family proxies.""") + first_parent = Field( + lambda: FamilyProxyDelta, + description="""Task first parent.""") + ancestors = List( + lambda: FamilyProxyDelta, + description="""First parent ancestors.""") + + +class EdgeDelta(Edge): + source_node = Field(Node) + target_node = Field(Node) + + +class WorkflowDeltas(Workflow): + families = List( + FamilyDelta, + description="""Family definitions.""", + args=def_args) + family_proxies = List( + FamilyProxyDelta, + description="""Family cycle instances.""", + args=proxy_args) + jobs = List( + JobDelta, + description="""Task jobs.""", + args=jobs_args) + tasks = List( + TaskDelta, + description="""Task definitions.""", + args=def_args) + task_proxies = List( + TaskProxyDelta, + description="""Task cycle instances.""", + args=proxy_args) + edges = List( + EdgeDelta, + description="""Graph edges""", + args=edge_args) + + +class Pruned(ObjectType): + class Meta: + description = """WFS Nodes/Edges that have been removed.""" + families = List(String) + family_proxies = List(String) + jobs = List(String) + tasks = List(String) + task_proxies = List(String) + edges = List(String) + + +class Deltas(ObjectType): + class Meta: + description = """Grouped deltas of the WFS publish""" + workflow = Field( + WorkflowDeltas, + description=WorkflowDeltas._meta.description, + ) + pruned = Field( + Pruned, + description=Pruned._meta.description, + ) + + class Subscriptions(ObjectType): """Defines the subscriptions available in the schema.""" class Meta: description = """Multi-Workflow root level subscriptions.""" + deltas = Field( + Deltas, + description=Deltas._meta.description, + args=delta_args, + resolver=delta_subs) workflows = List( Workflow, description=Workflow._meta.description, diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index 46f3eeac04c..af6daeb4fa7 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -27,6 +27,9 @@ from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.network import encode_, decode_, ZMQSocketBase from cylc.flow.network.authorisation import Priv, authorise +from cylc.flow.network.graphql import ( + GraphQLCoreBackend, IgnoreFieldMiddleware +) from cylc.flow.network.resolvers import Resolvers from cylc.flow.network.schema import schema from cylc.flow.suite_status import ( @@ -43,6 +46,9 @@ 'pb_data_elements': DELTAS_MAP } +# Avoid re-instantiation +MIDDLEWARE = [IgnoreFieldMiddleware()] + def expose(func=None): """Expose a method on the sever.""" @@ -140,8 +146,9 @@ def __init__(self, schd, context=None, barrier=None, self.endpoints = None self.queue = None self.resolvers = Resolvers( - self.schd.data_store_mgr.data, - schd=self.schd) + self.schd.data_store_mgr, + schd=self.schd + ) def _socket_options(self): """Set socket options. @@ -350,8 +357,12 @@ def graphql(self, request_string=None, variables=None): context={ 'resolvers': self.resolvers, }, + backend=GraphQLCoreBackend(), + middleware=MIDDLEWARE, executor=AsyncioExecutor(), + validate=True, # validate schema (dev only? default is True) return_promise=False, + strip_null=True, ) except Exception as exc: return 'ERROR: GraphQL execution error \n%s' % exc diff --git a/cylc/flow/network/subscriber.py b/cylc/flow/network/subscriber.py index fa1b492901c..4ec2f54069a 100644 --- a/cylc/flow/network/subscriber.py +++ b/cylc/flow/network/subscriber.py @@ -20,6 +20,7 @@ from time import sleep from typing import Iterable, Union +import asyncio import zmq from cylc.flow.network import ZMQSocketBase, get_location @@ -104,4 +105,4 @@ async def subscribe(self, msg_handler, *args, **kwargs): data = json.loads(msg) sys.stdout.write( json.dumps(data, indent=4) + '\n') - sleep(0) + await asyncio.sleep(0) diff --git a/cylc/flow/tests/network/test_resolvers.py b/cylc/flow/tests/network/test_resolvers.py index 4736b5553b8..ee3dc3e5085 100644 --- a/cylc/flow/tests/network/test_resolvers.py +++ b/cylc/flow/tests/network/test_resolvers.py @@ -124,11 +124,12 @@ def setUp(self) -> None: edge.id for edge in self.data[EDGES].values()] self.resolvers = Resolvers( - self.scheduler.data_store_mgr.data, - schd=self.scheduler) + self.scheduler.data_store_mgr, + schd=self.scheduler, + ) def test_constructor(self): - self.assertIsNotNone(self.resolvers.schd) + self.assertIsNotNone(self.resolvers.data_store_mgr) def test_get_workflows(self): """Test method returning workflow messages satisfying filter args.""" diff --git a/tests/graphql/03-is-held-arg.t b/tests/graphql/03-is-held-arg.t index 4a49b95a0db..46f212f8813 100755 --- a/tests/graphql/03-is-held-arg.t +++ b/tests/graphql/03-is-held-arg.t @@ -64,7 +64,6 @@ cat > expected << __HERE__ { "name": "${SUITE_NAME}", "isHeldTotal": 1, - "taskProxies": [], "familyProxies": [ { "id": "${USER}${ID_DELIM}${SUITE_NAME}${ID_DELIM}1${ID_DELIM}BAZ"