From a3218bd2e43c8bab4eb5ff27157e94004134612c Mon Sep 17 00:00:00 2001 From: David Sutherland Date: Wed, 6 May 2020 15:37:55 +1200 Subject: [PATCH] Delta review fixes, stripNull fix & cascade --- cylc/flow/data_store_mgr.py | 50 ++++-- cylc/flow/network/graphql.py | 195 ++++++++++++--------- cylc/flow/network/resolvers.py | 31 ++-- cylc/flow/network/schema.py | 312 ++++++++++++++++++++------------- cylc/flow/network/server.py | 4 +- 5 files changed, 354 insertions(+), 238 deletions(-) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 3f1f6470423..ff484f5d31f 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -55,7 +55,7 @@ from time import time import zlib -from cylc.flow import __version__ as CYLC_VERSION, ID_DELIM +from cylc.flow import __version__ as CYLC_VERSION, LOG, ID_DELIM from cylc.flow.cycling.loader import get_point from cylc.flow.data_messages_pb2 import ( PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask, @@ -156,14 +156,16 @@ def apply_delta(key, delta, data): # Merge in updated fields if getattr(delta, 'updated', False): if key == WORKFLOW: + # Clear fields that require overwrite with delta + field_set = {f.name for f, _ in delta.updated.ListFields()} for field in CLEAR_FIELD_MAP[key]: - data[key].ClearField(field) - # Clear fields the require overwrite with delta + if field in field_set: + data[key].ClearField(field) data[key].MergeFrom(delta.updated) else: for element in delta.updated: try: - # Clear fields the require overwrite with delta + # Clear fields that require overwrite with delta if CLEAR_FIELD_MAP[key]: for field, _ in element.ListFields(): if field.name in CLEAR_FIELD_MAP[key]: @@ -172,6 +174,10 @@ def apply_delta(key, delta, data): except KeyError: # Ensure data-sync doesn't fail with # network issues, sync reconcile/validate will catch. + LOG.debug( + 'Missing Data-Store element ' + 'on update application: %s' % str(exc) + ) continue # Prune data elements if hasattr(delta, 'pruned'): @@ -201,7 +207,7 @@ def create_delta_store(delta=None, workflow_id=None): dict """ - if delta is None: + if not isinstance(delta, AllDeltas): delta = AllDeltas() delta_store = { DELTA_ADDED: deepcopy(DATA_TEMPLATE), @@ -220,18 +226,17 @@ def create_delta_store(delta=None, workflow_id=None): for field, value in delta.ListFields(): for sub_field, sub_value in value.ListFields(): if sub_field.name in delta_store: - if sub_field.name in delta_store: - if ( - field.name == WORKFLOW - or sub_field.name == DELTA_PRUNED - ): - field_data = sub_value - else: - field_data = { - s.id: s - for s in sub_value - } - delta_store[sub_field.name][field.name] = field_data + if ( + field.name == WORKFLOW + or sub_field.name == DELTA_PRUNED + ): + field_data = sub_value + else: + field_data = { + s.id: s + for s in sub_value + } + delta_store[sub_field.name][field.name] = field_data return delta_store @@ -290,6 +295,7 @@ class DataStoreMgr: 'data', 'deltas', 'delta_queues', + 'delta_cycle_states', 'descendants', 'edge_points', 'max_point', @@ -313,6 +319,7 @@ def __init__(self, schd): self.min_point = None self.edge_points = {} self.cycle_states = {} + self.delta_cycle_states = {} # Managed data types self.data = { self.workflow_id: deepcopy(DATA_TEMPLATE) @@ -384,6 +391,7 @@ def generate_definition_elements(self): workflow = self.added[WORKFLOW] workflow.id = self.workflow_id workflow.last_updated = update_time + workflow.stamp = f'{workflow.id}@{workflow.last_updated}' graph = workflow.edges graph.leaves[:] = config.leaves @@ -847,6 +855,7 @@ def update_task_proxies(self, updated_tasks=None): task_proxies = self.data[self.workflow_id][TASK_PROXIES] update_time = time() task_defs = {} + self.delta_cycle_states = {} # update task instance for itask in updated_tasks: @@ -858,6 +867,8 @@ def update_task_proxies(self, updated_tasks=None): continue self.cycle_states.setdefault(point_string, {})[name] = ( itask.state.status, itask.state.is_held) + self.delta_cycle_states.setdefault(point_string, {})[name] = ( + itask.state.status, itask.state.is_held) # Gather task definitions for elapsed time recalculation. if name not in task_defs: task_defs[name] = itask.tdef @@ -916,14 +927,14 @@ def update_family_proxies(self, cycle_points=None): """ family_proxies = self.data[self.workflow_id][FAMILY_PROXIES] if cycle_points is None: - cycle_points = self.cycle_states.keys() + cycle_points = self.delta_cycle_states.keys() if not cycle_points: return for point_string in cycle_points: # For each cycle point, construct a family state tree # based on the first-parent single-inheritance tree - c_task_states = self.cycle_states.get(point_string, None) + c_task_states = self.delta_cycle_states.get(point_string, None) if c_task_states is None: continue c_fam_task_states = {} @@ -969,6 +980,7 @@ def update_workflow(self): workflow = self.updated[WORKFLOW] workflow.id = self.workflow_id workflow.last_updated = time() + workflow.stamp = f'{workflow.id}@{workflow.last_updated}' data = self.data[self.workflow_id] diff --git a/cylc/flow/network/graphql.py b/cylc/flow/network/graphql.py index eb136ebdef2..2ee2fc0814f 100644 --- a/cylc/flow/network/graphql.py +++ b/cylc/flow/network/graphql.py @@ -15,9 +15,6 @@ # along with this program. If not, see . """ -A modification of th GraphQL Core backend: -https://github.com/graphql-python/graphql-core - GraphQL Middleware defined here also. """ @@ -28,25 +25,28 @@ from inspect import isclass, iscoroutinefunction from graphene.utils.str_converters import to_snake_case -from graphql.execution import ExecutionResult, execute +from graphql.execution import ExecutionResult from graphql.execution.utils import ( get_operation_root_type, get_field_def ) from graphql.execution.values import get_argument_values, get_variable_values from graphql.language.base import parse, print_ast from graphql.language import ast -from graphql.validation import validate from graphql.backend.base import GraphQLBackend, GraphQLDocument +from graphql.backend.core import execute_and_validate from graphql.utils.base import type_from_ast from graphql.type import get_named_type from promise import Promise from rx import Observable +from cylc.flow.network.schema import NODE_MAP, get_type_str + logger = logging.getLogger(__name__) STRIP_ARG = 'strip_null' NULL_VALUE = None EMPTY_VALUES = ([], {}) +STRIP_OPS = {'query', 'subscription'} def grow_tree(tree, path, leaves=None): @@ -76,7 +76,7 @@ def grow_tree(tree, path, leaves=None): b_1 += 1 b_2 += 1 if leaves: - tree_loc[len(path) % 2].update(leaves) + tree_loc[len(path) % 2].update({'leaves': leaves}) def instantiate_middleware(middlewares): @@ -175,20 +175,22 @@ def has_arg_val(self, arg_name, arg_value): Boolean """ - try: - for components in self.operation_defs.values(): - if self.args_selection_search( - components['definition'].selection_set, - components['variables'], - components['parent_type'], - arg_name, - arg_value, - ): - return True - except Exception as exc: - import traceback - logger.debug(traceback.format_exc()) - logger.error(exc) + for components in self.operation_defs.values(): + defn = components['definition'] + if ( + defn.operation not in STRIP_OPS + or getattr( + defn.name, 'value', None) == 'IntrospectionQuery' + ): + continue + if self.args_selection_search( + components['definition'].selection_set, + components['variables'], + components['parent_type'], + arg_name, + arg_value, + ): + return True return False def args_selection_search( @@ -223,21 +225,14 @@ def args_selection_search( return False -def execute_and_validate( +def execute_and_validate_and_strip( schema, # type: GraphQLSchema document_ast, # type: Document *args, # type: Any **kwargs # type: Any ): # type: (...) -> Union[ExecutionResult, Observable] - """Validate schema, and execute request doc against it.""" - do_validation = kwargs.get("validate", True) - if do_validation: - validation_errors = validate(schema, document_ast) - if validation_errors: - return ExecutionResult(errors=validation_errors, invalid=True) - - result = execute(schema, document_ast, *args, **kwargs) + result = execute_and_validate(schema, document_ast, *args, **kwargs) # Search request docuement to determine if 'stripNull: true' is set # as and argument. It can not be done in the middleware, as they @@ -251,17 +246,41 @@ def execute_and_validate( return result -class GraphQLCoreBackend(GraphQLBackend): - """GraphQLCoreBackend will return a document using the default - graphql executor""" +class CylcGraphQLBackend(GraphQLBackend): + """Return a GraphQL document using the default + graphql executor with optional null-stripping of result. + + The null value stripping of result is triggered by the presence + of argument & value "stripNull: true" in any field. + + This is a modification of GraphQLCoreBackend found within: + https://github.com/graphql-python/graphql-core-legacy + (graphql-core==2.3.2) + + Args: + + executor (object): Executor used in evaluating the resolvers. + + """ def __init__(self, executor=None): - # type: (Optional[Any]) -> None self.execute_params = {"executor": executor} def document_from_string(self, schema, document_string): - # type: (GraphQLSchema, Union[Document, str]) -> GraphQLDocument - """Parse string and setup request docutment for execution.""" + """Parse string and setup request docutment for execution. + + Args: + + schema (graphql.GraphQLSchema): + Schema definition object + document_string (str): + Request query/mutation/subscription document. + + Returns: + + graphql.GraphQLDocument + + """ if isinstance(document_string, ast.Document): document_ast = document_string document_string = print_ast(document_ast) @@ -274,7 +293,7 @@ def document_from_string(self, schema, document_string): document_string=document_string, document_ast=document_ast, execute=partial( - execute_and_validate, + execute_and_validate_and_strip, schema, document_ast, **self.execute_params @@ -302,52 +321,70 @@ def resolve(self, next, root, info, **args): if getattr(info.operation.name, 'value', None) == 'IntrospectionQuery': return next(root, info, **args) - path_string = f'{info.path}' - parent_path_string = f'{info.path[:-1:]}' - field_name = to_snake_case(info.field_name) - # Avoid using the protobuf default if field isn't set. - if ( - parent_path_string not in self.field_sets - and hasattr(root, 'ListFields') - ): - self.field_sets[parent_path_string] = set( - field.name - for field, _ in root.ListFields() - ) - - # Needed for child fields that resolve without args. - # Store arguments of parents as leaves of schema tree from path - # to respective field. - if STRIP_ARG in args: + if info.operation.operation in STRIP_OPS: + path_string = f'{info.path}' + # Needed for child fields that resolve without args. + # Store arguments of parents as leaves of schema tree from path + # to respective field. # no need to regrow the tree on every subscription push/delta - if path_string not in self.tree_paths: + if args and path_string not in self.tree_paths: grow_tree(self.args_tree, info.path, args) self.tree_paths.add(path_string) - else: - args[STRIP_ARG] = False - branch = self.args_tree - for section in info.path: - branch = branch.get(section, {}) - if not branch: - break - # Only set if present on branch section - if STRIP_ARG in branch: - args[STRIP_ARG] = branch[STRIP_ARG] - - # Now flag empty fields as 'null for stripping - if args[STRIP_ARG]: - if ( - hasattr(root, field_name) - and field_name not in self.field_sets.get( - parent_path_string, {field_name}) - ): - return None - if ( - info.operation.operation in self.ASYNC_OPS - or iscoroutinefunction(next) - ): - return self.async_null_setter(next, root, info, **args) - return null_setter(next(root, info, **args)) + if STRIP_ARG not in args: + branch = self.args_tree + for section in info.path: + if section not in branch: + break + branch = branch[section] + # Only set if present on branch section + if 'leaves' in branch and STRIP_ARG in branch['leaves']: + args[STRIP_ARG] = branch['leaves'][STRIP_ARG] + + # Now flag empty fields as null for stripping + if args.get(STRIP_ARG, False): + field_name = to_snake_case(info.field_name) + + # Clear field set so recreated via first child field, + # as path may be a parent. + # Done here as parent may be in NODE_MAP + if path_string in self.field_sets: + del self.field_sets[path_string] + + # Avoid using the protobuf default if field isn't set. + if ( + hasattr(root, 'ListFields') + and hasattr(root, field_name) + and get_type_str(info.return_type) not in NODE_MAP + ): + + # Gather fields set in root + parent_path_string = f'{info.path[:-1:]}' + stamp = getattr(root, 'stamp', '') + if ( + parent_path_string not in self.field_sets + or self.field_sets[ + parent_path_string]['stamp'] != stamp + ): + self.field_sets[parent_path_string] = { + 'stamp': stamp, + 'fields': set( + field.name + for field, _ in root.ListFields() + ) + } + + if ( + parent_path_string in self.field_sets + and field_name not in self.field_sets[ + parent_path_string]['fields'] + ): + return None + if ( + info.operation.operation in self.ASYNC_OPS + or iscoroutinefunction(next) + ): + return self.async_null_setter(next, root, info, **args) + return null_setter(next(root, info, **args)) if ( info.operation.operation in self.ASYNC_OPS diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 96b94bb810f..03106e85fd4 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -17,7 +17,6 @@ """GraphQL resolvers for use in data accessing and mutation of workflows.""" import asyncio -from concurrent.futures._base import CancelledError from fnmatch import fnmatchcase from getpass import getuser import logging @@ -380,7 +379,7 @@ async def subscribe_delta(self, root, info, args): else: yield await sub_resolver(root, info, **args) for w_id in w_ids: - if w_id in delta_queues: + if w_id in self.data_store_mgr.data: if sub_id not in delta_queues[w_id]: delta_queues[w_id][sub_id] = deltas_queue # On new yield workflow data-store as added delta @@ -388,8 +387,7 @@ async def subscribe_delta(self, root, info, args): delta_store = create_delta_store( workflow_id=w_id) delta_store[DELTA_ADDED] = ( - self.data_store_mgr.data[w_id] - ) + self.data_store_mgr.data[w_id]) self.delta_store[sub_id][w_id] = delta_store if sub_resolver is None: yield delta_store @@ -400,19 +398,18 @@ async def subscribe_delta(self, root, info, args): yield result elif w_id in self.delta_store[sub_id]: del self.delta_store[sub_id][w_id] - if sub_resolver is None: - yield create_delta_store(workflow_id=w_id) - else: - yield await sub_resolver(root, info, **args) try: - w_id, _, delta_store = deltas_queue.get(False) + w_id, topic, delta_store = deltas_queue.get(False) + if topic != 'shutdown': + new_time = time() + elapsed = new_time - old_time + # ignore deltas that are more frequent than interval. + if elapsed <= interval: + continue + old_time = new_time + else: + delta_store['shutdown'] = True self.delta_store[sub_id][w_id] = delta_store - new_time = time() - elapsed = new_time - old_time - # ignore deltas that are more frequent than interval. - if elapsed <= interval: - continue - old_time = new_time if sub_resolver is None: yield delta_store else: @@ -421,8 +418,8 @@ async def subscribe_delta(self, root, info, args): yield result except queue.Empty: await asyncio.sleep(DELTA_SLEEP_INTERVAL) - except CancelledError: - pass + except (GeneratorExit, asyncio.CancelledError): + raise except Exception: import traceback logger.warn(traceback.format_exc()) diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 7af7656c174..b8893555a03 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -16,6 +16,7 @@ """GraphQL API schema via Graphene implementation.""" +from copy import deepcopy from functools import partial import logging from textwrap import dedent @@ -185,8 +186,10 @@ class SortArgs(InputObjectType): reverse = Boolean(default_value=False) -STRIP_NULL_DEFAULT = Boolean(default_value=False) +GHOSTS_DEFAULT = Boolean(default_value=False) +STRIP_NULL_DEFAULT = Boolean() DELTA_STORE_DEFAULT = Boolean(default_value=False) +DELTA_TYPE_DEFAULT = String(default_value='added') JOB_ARGS = { 'ids': List(ID, default_value=[]), @@ -225,7 +228,6 @@ class SortArgs(InputObjectType): } PROXY_ARGS = { - 'ghosts': Boolean(default_value=False), 'ids': List(ID, default_value=[]), 'exids': List(ID, default_value=[]), 'states': List(String, default_value=[]), @@ -237,7 +239,6 @@ class SortArgs(InputObjectType): } ALL_PROXY_ARGS = { - 'ghosts': Boolean(default_value=False), 'workflows': List(ID, default_value=[]), 'exworkflows': List(ID, default_value=[]), 'ids': List(ID, default_value=[]), @@ -267,7 +268,6 @@ class SortArgs(InputObjectType): } NODES_EDGES_ARGS = { - 'ghosts': Boolean(default_value=False), 'ids': List(ID, default_value=[]), 'exids': List(ID, default_value=[]), 'states': List(String, default_value=[]), @@ -280,7 +280,6 @@ class SortArgs(InputObjectType): } NODES_EDGES_ARGS_ALL = { - 'ghosts': Boolean(default_value=False), 'workflows': List(ID, default_value=[]), 'exworkflows': List(ID, default_value=[]), 'ids': List(ID, default_value=[]), @@ -308,16 +307,50 @@ class SortArgs(InputObjectType): # /objecttypes/#naming-convention # with name 'root' used here, it provides context to the resolvers. + # Resolvers: +def get_type_str(obj_type): + """Iterate through the objects of_type to find the inner-most type.""" + pointer = obj_type + while hasattr(pointer, 'of_type'): + pointer = pointer.of_type + return str(pointer).replace('!', '') -async def get_workflows(root, info, **args): - """Get filtered workflows.""" - # Set subscription info + +def process_resolver_info(root, info, args): + """Set and gather info for resolver.""" + # Add the subscription id to the resolver context + # to know which delta-store to use.""" if 'sub_id' in info.context: args['sub_id'] = info.context['sub_id'] - if root and field_name in root: - args['ids'] = [root[field_name].id] + + field_name = to_snake_case(info.field_name) + # root is the parent data object. + # i.e. PbWorkflow or list of IDs List(String) + if isinstance(root, dict): + root_value = root.get(field_name, None) + else: + root_value = getattr(root, field_name, None) + + return (field_name, root_value) + + +def get_native_ids(field_ids): + """Collect IDs into list form.""" + if isinstance(field_ids, str): + return [field_ids] + if isinstance(field_ids, dict): + return [f_id for f_id in field_ids] + return field_ids + + +async def get_workflows(root, info, **args): + """Get filtered workflows.""" + + _, workflow = process_resolver_info(root, info, args) + if workflow is not None: + args['ids'] = [workflow.id] args['workflows'] = [parse_workflow_id(w_id) for w_id in args['ids']] args['exworkflows'] = [parse_workflow_id(w_id) for w_id in args['exids']] @@ -327,11 +360,10 @@ async def get_workflows(root, info, **args): async def get_workflow_by_id(root, info, **args): """Return single workflow element.""" - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] - field_name = to_snake_case(info.field_name) - if root and field_name in root: - args['id'] = root[field_name].id + + _, workflow = process_resolver_info(root, info, args) + if workflow is not None: + args['id'] = workflow.id args['workflow'] = args['id'] resolvers = info.context.get('resolvers') @@ -340,11 +372,8 @@ async def get_workflow_by_id(root, info, **args): async def get_nodes_all(root, info, **args): """Resolver for returning job, task, family nodes""" - field_name = to_snake_case(info.field_name) - if isinstance(root, dict): - field_ids = root.get(field_name, None) - else: - field_ids = getattr(root, field_name, None) + + _, field_ids = process_resolver_info(root, info, args) if hasattr(args, 'id'): args['ids'] = [args.get('id')] @@ -357,15 +386,7 @@ async def get_nodes_all(root, info, **args): elif field_ids == []: return [] - try: - obj_type = str(info.return_type.of_type).replace('!', '') - except AttributeError: - obj_type = str(info.return_type) - node_type = NODE_MAP[obj_type] - - # Set subscription info - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] + node_type = NODE_MAP[get_type_str(info.return_type)] args['ids'] = [parse_node_id(n_id, node_type) for n_id in args['ids']] args['exids'] = [parse_node_id(n_id, node_type) for n_id in args['exids']] @@ -379,70 +400,76 @@ async def get_nodes_all(root, info, **args): async def get_nodes_by_ids(root, info, **args): """Resolver for returning job, task, family node""" - field_name = to_snake_case(info.field_name) - if isinstance(root, dict): - field_ids = root.get(field_name, None) - else: - field_ids = getattr(root, field_name, None) - if field_ids: - if isinstance(field_ids, str): - field_ids = [field_ids] - elif isinstance(field_ids, dict): - field_ids = [f_id for f_id in field_ids] - args['native_ids'] = field_ids - elif field_ids == []: - return [] + field_name, field_ids = process_resolver_info(root, info, args) - try: - obj_type = str(info.return_type.of_type).replace('!', '') - except AttributeError: - obj_type = str(info.return_type) - node_type = NODE_MAP[obj_type] + resolvers = info.context.get('resolvers') + if field_ids == []: + parent_id = getattr(root, 'id', None) + # Find node ids from parent + if parent_id: + parent_args = deepcopy(args) + parent_args.update( + {'id': parent_id, 'delta_store': False} + ) + parent = await resolvers.get_node_by_id( + NODE_MAP[get_type_str(info.parent_type)], + parent_args + ) + field_ids = getattr(parent, field_name, None) + if not field_ids: + return [] + if field_ids: + args['native_ids'] = get_native_ids(field_ids) - # Set subscription info - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] + node_type = NODE_MAP[get_type_str(info.return_type)] - if args.get('id'): - args['ids'] = [args.get('id')] args['ids'] = [parse_node_id(n_id, node_type) for n_id in args['ids']] args['exids'] = [parse_node_id(n_id, node_type) for n_id in args['exids']] - resolvers = info.context.get('resolvers') return await resolvers.get_nodes_by_ids(node_type, args) async def get_node_by_id(root, info, **args): """Resolver for returning job, task, family node""" - field_name = to_snake_case(info.field_name) + + field_name, field_id = process_resolver_info(root, info, args) + if field_name == 'source_node': field_name = 'source' elif field_name == 'target_node': field_name = 'target' - field_id = getattr(root, field_name, None) - if field_id: - args['id'] = field_id - if args.get('id', None) is None: - return None - - try: - obj_type = str(info.return_type.of_type).replace('!', '') - except AttributeError: - obj_type = str(info.return_type) - - # Set subscription info - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] - resolvers = info.context.get('resolvers') - return await resolvers.get_node_by_id(NODE_MAP[obj_type], args) + if args.get('id') is None: + field_id = getattr(root, field_name, None) + # Find node id from parent + if not field_id: + parent_id = getattr(root, 'id', None) + if parent_id: + parent_args = deepcopy(args) + parent_args.update( + {'id': parent_id, 'delta_store': False} + ) + args['id'] = parent_id + parent = await resolvers.get_node_by_id( + NODE_MAP[get_type_str(info.parent_type)], + parent_args + ) + field_id = getattr(parent, field_name, None) + if field_id: + args['id'] = field_id + else: + return None + + return await resolvers.get_node_by_id( + NODE_MAP[get_type_str(info.return_type)], + args) async def get_edges_all(root, info, **args): - # Set subscription info - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] + """Get all edges from the store filtered by args.""" + + process_resolver_info(root, info, args) args['workflows'] = [ parse_workflow_id(w_id) for w_id in args['workflows']] @@ -453,46 +480,37 @@ async def get_edges_all(root, info, **args): async def get_edges_by_ids(root, info, **args): - field_name = to_snake_case(info.field_name) - if isinstance(root, dict): - field_ids = root.get(field_name, None) - else: - field_ids = getattr(root, field_name, None) + """Get all edges from the store by id lookup filtered by args.""" + + _, field_ids = process_resolver_info(root, info, args) if field_ids: - if isinstance(field_ids, str): - field_ids = [field_ids] - elif isinstance(field_ids, dict): - field_ids = [f_id for f_id in field_ids] - args['native_ids'] = field_ids + args['native_ids'] = get_native_ids(field_ids) elif field_ids == []: return [] - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] - resolvers = info.context.get('resolvers') return await resolvers.get_edges_by_ids(args) async def get_nodes_edges(root, info, **args): """Resolver for returning job, task, family nodes""" - node_type = NODE_MAP['TaskProxy'] - workflow = getattr(root, 'id', None) - if workflow: - args['workflows'] = [parse_workflow_id(workflow)] + + process_resolver_info(root, info, args) + + if hasattr(root, 'id'): + args['workflows'] = [parse_workflow_id(root.id)] args['exworkflows'] = [] else: args['workflows'] = [ parse_workflow_id(w_id) for w_id in args['workflows']] args['exworkflows'] = [ parse_workflow_id(w_id) for w_id in args['exworkflows']] + + node_type = NODE_MAP['TaskProxy'] args['ids'] = [parse_node_id(n_id, node_type) for n_id in args['ids']] args['exids'] = [parse_node_id(n_id, node_type) for n_id in args['exids']] - if 'sub_id' in info.context: - args['sub_id'] = info.context['sub_id'] - resolvers = info.context.get('resolvers') root_nodes = await resolvers.get_nodes_all(node_type, args) return await resolvers.get_nodes_edges(root_nodes, args) @@ -543,6 +561,7 @@ class Meta: args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) families = List( lambda: Family, @@ -550,20 +569,25 @@ class Meta: args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) task_proxies = List( lambda: TaskProxy, description="""Task cycle instances.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) family_proxies = List( lambda: FamilyProxy, description="""Family cycle instances.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) jobs = List( lambda: Job, @@ -571,18 +595,22 @@ class Meta: args=JOB_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) edges = Field( lambda: Edges, args=EDGE_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, description="""Graph edges""") nodes_edges = Field( lambda: NodesEdges, args=NODES_EDGES_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_edges) api_version = Int() cylc_version = String() @@ -610,14 +638,14 @@ class Meta: submit_num = Int() state = String() # name and cycle_point for filtering/sorting - name = String(required=True) - cycle_point = String(required=True) + name = String() + cycle_point = String() task_proxy = Field( lambda: TaskProxy, description="""Associated Task Proxy""", - required=True, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) submitted_time = String() started_time = String() @@ -657,10 +685,12 @@ class Meta: lambda: TaskProxy, description="""Associated cycle point proxies""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) - namespace = List(String, required=True) + namespace = List(String) class PollTask(ObjectType): @@ -679,6 +709,9 @@ class Meta: task_proxy = Field( lambda: TaskProxy, description="""Associated Task Proxy""", + strip_null=STRIP_NULL_DEFAULT, + delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) expr_alias = String() req_state = String() @@ -704,9 +737,9 @@ class Meta: task = Field( Task, description="""Task definition""", - required=True, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) state = String() cycle_point = String() @@ -718,8 +751,8 @@ class Meta: outputs = List(String, default_value=[]) broadcasts = List(String, default_value=[]) # name & namespace for filtering/sorting - name = String(required=True) - namespace = List(String, required=True) + name = String() + namespace = List(String) prerequisites = List(Prerequisite) jobs = List( Job, @@ -727,27 +760,34 @@ class Meta: args=JOB_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) parents = List( lambda: FamilyProxy, description="""Task parents.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) first_parent = Field( lambda: FamilyProxy, description="""Task first parent.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) ancestors = List( lambda: FamilyProxy, description="""First parent ancestors.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) @@ -755,15 +795,17 @@ class Family(ObjectType): class Meta: description = """Task definition, static fields""" id = ID(required=True) - name = String(required=True) + name = String() meta = Field(DefMeta) depth = Int() proxies = List( lambda: FamilyProxy, description="""Associated cycle point proxies""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) parents = List( lambda: Family, @@ -771,6 +813,7 @@ class Meta: args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) child_tasks = List( Task, @@ -778,6 +821,7 @@ class Meta: args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) child_families = List( lambda: Family, @@ -785,6 +829,7 @@ class Meta: args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) @@ -794,13 +839,13 @@ class Meta: id = ID(required=True) cycle_point = String() # name & namespace for filtering/sorting - name = String(required=True) + name = String() family = Field( Family, description="""Family definition""", - required=True, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) state = String() is_held = Boolean() @@ -809,36 +854,46 @@ class Meta: lambda: FamilyProxy, description="""Family parent proxies.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) child_tasks = List( TaskProxy, description="""Descendant task proxies.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) child_families = List( lambda: FamilyProxy, description="""Descendant family proxies.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) first_parent = Field( lambda: FamilyProxy, description="""Task first parent.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) ancestors = List( lambda: FamilyProxy, description="""First parent ancestors.""", args=PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_nodes_by_ids) @@ -862,12 +917,14 @@ class Meta: Node, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) target = ID() target_node = Field( Node, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_node_by_id) suicide = Boolean() cond = Boolean() @@ -878,10 +935,10 @@ class Meta: description = """Dependency edge""" edges = List( Edge, - required=True, args=EDGE_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, + delta_type=DELTA_TYPE_DEFAULT, resolver=get_edges_by_ids) workflow_polling_tasks = List(PollTask) leaves = List(String) @@ -944,6 +1001,7 @@ class Meta: TaskProxy, description=TaskProxy._meta.description, args=ALL_PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, resolver=get_nodes_all) family = Field( @@ -968,6 +1026,7 @@ class Meta: FamilyProxy, description=FamilyProxy._meta.description, args=ALL_PROXY_ARGS, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, resolver=get_nodes_all) edges = List( @@ -980,6 +1039,7 @@ class Meta: NodesEdges, description=NodesEdges._meta.description, args=NODES_EDGES_ARGS_ALL, + ghosts=GHOSTS_DEFAULT, strip_null=STRIP_NULL_DEFAULT, resolver=get_nodes_edges) @@ -1768,7 +1828,7 @@ class Meta: Family, description="""Family definitions.""", args=DEF_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_nodes_by_ids @@ -1777,7 +1837,8 @@ class Meta: FamilyProxy, description="""Family cycle instances.""", args=PROXY_ARGS, - strip_null=Boolean(default_value=True), + ghosts=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_nodes_by_ids @@ -1786,7 +1847,7 @@ class Meta: Job, description="""Task jobs.""", args=JOB_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_nodes_by_ids @@ -1795,7 +1856,7 @@ class Meta: Task, description="""Task definitions.""", args=DEF_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_nodes_by_ids @@ -1804,7 +1865,8 @@ class Meta: TaskProxy, description="""Task cycle instances.""", args=PROXY_ARGS, - strip_null=Boolean(default_value=True), + ghosts=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_nodes_by_ids @@ -1813,7 +1875,7 @@ class Meta: Edge, description="""Graph edges""", args=EDGE_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_edges_by_ids @@ -1821,7 +1883,7 @@ class Meta: workflow = Field( Workflow, description=Workflow._meta.description, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), resolver=get_workflow_by_id @@ -1836,7 +1898,7 @@ class Meta: Family, description="""Family definitions.""", args=DEF_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_nodes_by_ids @@ -1845,7 +1907,8 @@ class Meta: FamilyProxy, description="""Family cycle instances.""", args=PROXY_ARGS, - strip_null=Boolean(default_value=True), + ghosts=Boolean(default_value=False), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_nodes_by_ids @@ -1854,7 +1917,7 @@ class Meta: Job, description="""Task jobs.""", args=JOB_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_nodes_by_ids @@ -1863,7 +1926,7 @@ class Meta: Task, description="""Task definitions.""", args=DEF_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_nodes_by_ids @@ -1872,7 +1935,8 @@ class Meta: TaskProxy, description="""Task cycle instances.""", args=PROXY_ARGS, - strip_null=Boolean(default_value=True), + ghosts=Boolean(default_value=False), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_nodes_by_ids @@ -1881,7 +1945,7 @@ class Meta: Edge, description="""Graph edges""", args=EDGE_ARGS, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_edges_by_ids @@ -1889,7 +1953,7 @@ class Meta: workflow = Field( Workflow, description=Workflow._meta.description, - strip_null=Boolean(default_value=True), + strip_null=Boolean(), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_UPDATED), resolver=get_workflow_by_id @@ -1900,20 +1964,21 @@ class Deltas(ObjectType): class Meta: description = """Grouped deltas of the WFS publish""" id = ID(required=True) + shutdown = Boolean(default_value=False) added = Field( Added, description=Added._meta.description, - strip_null=Boolean(default_value=False), + strip_null=Boolean(), ) updated = Field( Updated, description=Updated._meta.description, - strip_null=Boolean(default_value=False), + strip_null=Boolean(), ) pruned = Field( Pruned, description=Pruned._meta.description, - strip_null=Boolean(default_value=False), + strip_null=Boolean(), ) @@ -1992,6 +2057,7 @@ class Meta: TaskProxy, description=TaskProxy._meta.description, id=ID(required=True), + ghosts=Boolean(default_value=True), strip_null=Boolean(default_value=True), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), @@ -2003,6 +2069,7 @@ class Meta: TaskProxy, description=TaskProxy._meta.description, args=ALL_PROXY_ARGS, + ghosts=Boolean(default_value=True), strip_null=Boolean(default_value=True), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), @@ -2036,6 +2103,7 @@ class Meta: FamilyProxy, description=FamilyProxy._meta.description, id=ID(required=True), + ghosts=Boolean(default_value=True), strip_null=Boolean(default_value=True), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), @@ -2047,6 +2115,7 @@ class Meta: FamilyProxy, description=FamilyProxy._meta.description, args=ALL_PROXY_ARGS, + ghosts=Boolean(default_value=True), strip_null=Boolean(default_value=True), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), @@ -2069,6 +2138,7 @@ class Meta: NodesEdges, description=NodesEdges._meta.description, args=NODES_EDGES_ARGS_ALL, + ghosts=Boolean(default_value=True), strip_null=Boolean(default_value=True), delta_store=Boolean(default_value=True), delta_type=String(default_value=DELTA_ADDED), diff --git a/cylc/flow/network/server.py b/cylc/flow/network/server.py index cc5efe8e540..35bad4ceb84 100644 --- a/cylc/flow/network/server.py +++ b/cylc/flow/network/server.py @@ -28,7 +28,7 @@ from cylc.flow.network import encode_, decode_, ZMQSocketBase from cylc.flow.network.authorisation import Priv, authorise from cylc.flow.network.graphql import ( - GraphQLCoreBackend, IgnoreFieldMiddleware, instantiate_middleware + CylcGraphQLBackend, IgnoreFieldMiddleware, instantiate_middleware ) from cylc.flow.network.resolvers import Resolvers from cylc.flow.network.schema import schema @@ -357,7 +357,7 @@ def graphql(self, request_string=None, variables=None): context={ 'resolvers': self.resolvers, }, - backend=GraphQLCoreBackend(), + backend=CylcGraphQLBackend(), middleware=list(instantiate_middleware(self.middleware)), executor=AsyncioExecutor(), validate=True, # validate schema (dev only? default is True)