Skip to content

Commit

Permalink
Delta review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed May 19, 2020
1 parent 76261ec commit 92fdab2
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 220 deletions.
48 changes: 29 additions & 19 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from time import time
import zlib

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow import __version__ as CYLC_VERSION, LOG
from cylc.flow.cycling.loader import get_point
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy, PbJob, PbTask,
Expand Down Expand Up @@ -157,14 +157,16 @@ def apply_delta(key, delta, data):
# Merge in updated fields
if getattr(delta, 'updated', False):
if key == WORKFLOW:
# Clear fields that require overwrite with delta
field_set = {f.name for f, _ in delta.updated.ListFields()}
for field in CLEAR_FIELD_MAP[key]:
data[key].ClearField(field)
# Clear fields the require overwrite with delta
if field in field_set:
data[key].ClearField(field)
data[key].MergeFrom(delta.updated)
else:
for element in delta.updated:
try:
# Clear fields the require overwrite with delta
# Clear fields that require overwrite with delta
if CLEAR_FIELD_MAP[key]:
for field, _ in element.ListFields():
if field.name in CLEAR_FIELD_MAP[key]:
Expand All @@ -173,6 +175,10 @@ def apply_delta(key, delta, data):
except KeyError:
# Ensure data-sync doesn't fail with
# network issues, sync reconcile/validate will catch.
LOG.debug(
'Missing Data-Store element '
'on update application: %s' % str(exc)
)
continue
# Prune data elements
if hasattr(delta, 'pruned'):
Expand Down Expand Up @@ -202,7 +208,7 @@ def create_delta_store(delta=None, workflow_id=None):
dict
"""
if delta is None:
if not isinstance(delta, AllDeltas):
delta = AllDeltas()
delta_store = {
DELTA_ADDED: deepcopy(DATA_TEMPLATE),
Expand All @@ -221,18 +227,17 @@ def create_delta_store(delta=None, workflow_id=None):
for field, value in delta.ListFields():
for sub_field, sub_value in value.ListFields():
if sub_field.name in delta_store:
if sub_field.name in delta_store:
if (
field.name == WORKFLOW
or sub_field.name == DELTA_PRUNED
):
field_data = sub_value
else:
field_data = {
s.id: s
for s in sub_value
}
delta_store[sub_field.name][field.name] = field_data
if (
field.name == WORKFLOW
or sub_field.name == DELTA_PRUNED
):
field_data = sub_value
else:
field_data = {
s.id: s
for s in sub_value
}
delta_store[sub_field.name][field.name] = field_data
return delta_store


Expand Down Expand Up @@ -291,6 +296,7 @@ class DataStoreMgr:
'data',
'deltas',
'delta_queues',
'delta_cycle_states',
'descendants',
'edge_points',
'max_point',
Expand All @@ -314,6 +320,7 @@ def __init__(self, schd):
self.min_point = None
self.edge_points = {}
self.cycle_states = {}
self.delta_cycle_states = {}
# Managed data types
self.data = {
self.workflow_id: deepcopy(DATA_TEMPLATE)
Expand Down Expand Up @@ -848,6 +855,7 @@ def update_task_proxies(self, updated_tasks=None):
task_proxies = self.data[self.workflow_id][TASK_PROXIES]
update_time = time()
task_defs = {}
self.delta_cycle_states = {}

# update task instance
for itask in updated_tasks:
Expand All @@ -859,6 +867,8 @@ def update_task_proxies(self, updated_tasks=None):
continue
self.cycle_states.setdefault(point_string, {})[name] = (
itask.state.status, itask.state.is_held)
self.delta_cycle_states.setdefault(point_string, {})[name] = (
itask.state.status, itask.state.is_held)
# Gather task definitions for elapsed time recalculation.
if name not in task_defs:
task_defs[name] = itask.tdef
Expand Down Expand Up @@ -917,14 +927,14 @@ def update_family_proxies(self, cycle_points=None):
"""
family_proxies = self.data[self.workflow_id][FAMILY_PROXIES]
if cycle_points is None:
cycle_points = self.cycle_states.keys()
cycle_points = self.delta_cycle_states.keys()
if not cycle_points:
return

for point_string in cycle_points:
# For each cycle point, construct a family state tree
# based on the first-parent single-inheritance tree
c_task_states = self.cycle_states.get(point_string, None)
c_task_states = self.delta_cycle_states.get(point_string, None)
if c_task_states is None:
continue
c_fam_task_states = {}
Expand Down
180 changes: 101 additions & 79 deletions cylc/flow/network/graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A modification of th GraphQL Core backend:
https://github.com/graphql-python/graphql-core
GraphQL Middleware defined here also.
"""
Expand All @@ -28,25 +25,28 @@
from inspect import isclass, iscoroutinefunction

from graphene.utils.str_converters import to_snake_case
from graphql.execution import ExecutionResult, execute
from graphql.execution import ExecutionResult
from graphql.execution.utils import (
get_operation_root_type, get_field_def
)
from graphql.execution.values import get_argument_values, get_variable_values
from graphql.language.base import parse, print_ast
from graphql.language import ast
from graphql.validation import validate
from graphql.backend.base import GraphQLBackend, GraphQLDocument
from graphql.backend.core import execute_and_validate
from graphql.utils.base import type_from_ast
from graphql.type import get_named_type
from promise import Promise
from rx import Observable

from cylc.flow.network.schema import NODE_MAP, get_type_str

logger = logging.getLogger(__name__)

STRIP_ARG = 'strip_null'
NULL_VALUE = None
EMPTY_VALUES = ([], {})
STRIP_OPS = {'query', 'subscription'}


def grow_tree(tree, path, leaves=None):
Expand Down Expand Up @@ -175,20 +175,22 @@ def has_arg_val(self, arg_name, arg_value):
Boolean
"""
try:
for components in self.operation_defs.values():
if self.args_selection_search(
components['definition'].selection_set,
components['variables'],
components['parent_type'],
arg_name,
arg_value,
):
return True
except Exception as exc:
import traceback
logger.debug(traceback.format_exc())
logger.error(exc)
for components in self.operation_defs.values():
defn = components['definition']
if (
defn.operation not in STRIP_OPS
or getattr(
defn.name, 'value', None) == 'IntrospectionQuery'
):
continue
if self.args_selection_search(
components['definition'].selection_set,
components['variables'],
components['parent_type'],
arg_name,
arg_value,
):
return True
return False

def args_selection_search(
Expand Down Expand Up @@ -223,21 +225,14 @@ def args_selection_search(
return False


def execute_and_validate(
def execute_and_validate_and_strip(
schema, # type: GraphQLSchema
document_ast, # type: Document
*args, # type: Any
**kwargs # type: Any
):
# type: (...) -> Union[ExecutionResult, Observable]
"""Validate schema, and execute request doc against it."""
do_validation = kwargs.get("validate", True)
if do_validation:
validation_errors = validate(schema, document_ast)
if validation_errors:
return ExecutionResult(errors=validation_errors, invalid=True)

result = execute(schema, document_ast, *args, **kwargs)
result = execute_and_validate(schema, document_ast, *args, **kwargs)

# Search request docuement to determine if 'stripNull: true' is set
# as and argument. It can not be done in the middleware, as they
Expand All @@ -251,17 +246,41 @@ def execute_and_validate(
return result


class GraphQLCoreBackend(GraphQLBackend):
"""GraphQLCoreBackend will return a document using the default
graphql executor"""
class CylcGraphQLBackend(GraphQLBackend):
"""Return a GraphQL document using the default
graphql executor with optional null-stripping of result.
The null value stripping of result is triggered by the presence
of argument & value "stripNull: true" in any field.
This is a modification of GraphQLCoreBackend found within:
https://github.com/graphql-python/graphql-core-legacy
(graphql-core==2.3.2)
Args:
executor (object): Executor used in evaluating the resolvers.
"""

def __init__(self, executor=None):
# type: (Optional[Any]) -> None
self.execute_params = {"executor": executor}

def document_from_string(self, schema, document_string):
# type: (GraphQLSchema, Union[Document, str]) -> GraphQLDocument
"""Parse string and setup request docutment for execution."""
"""Parse string and setup request docutment for execution.
Args:
schema (graphql.GraphQLSchema):
Schema definition object
document_string (str):
Request query/mutation/subscription document.
Returns:
graphql.GraphQLDocument
"""
if isinstance(document_string, ast.Document):
document_ast = document_string
document_string = print_ast(document_ast)
Expand All @@ -274,7 +293,7 @@ def document_from_string(self, schema, document_string):
document_string=document_string,
document_ast=document_ast,
execute=partial(
execute_and_validate,
execute_and_validate_and_strip,
schema,
document_ast,
**self.execute_params
Expand Down Expand Up @@ -302,52 +321,55 @@ def resolve(self, next, root, info, **args):
if getattr(info.operation.name, 'value', None) == 'IntrospectionQuery':
return next(root, info, **args)

path_string = f'{info.path}'
parent_path_string = f'{info.path[:-1:]}'
field_name = to_snake_case(info.field_name)
# Avoid using the protobuf default if field isn't set.
if (
parent_path_string not in self.field_sets
and hasattr(root, 'ListFields')
):
self.field_sets[parent_path_string] = set(
field.name
for field, _ in root.ListFields()
)

# Needed for child fields that resolve without args.
# Store arguments of parents as leaves of schema tree from path
# to respective field.
if STRIP_ARG in args:
# no need to regrow the tree on every subscription push/delta
if path_string not in self.tree_paths:
grow_tree(self.args_tree, info.path, args)
self.tree_paths.add(path_string)
else:
args[STRIP_ARG] = False
branch = self.args_tree
for section in info.path:
branch = branch.get(section, {})
if not branch:
break
# Only set if present on branch section
if STRIP_ARG in branch:
args[STRIP_ARG] = branch[STRIP_ARG]

# Now flag empty fields as 'null for stripping
if args[STRIP_ARG]:
if info.operation.operation in STRIP_OPS:
path_string = f'{info.path}'
parent_path_string = f'{info.path[:-1:]}'
field_name = to_snake_case(info.field_name)
# Avoid using the protobuf default if field isn't set.
if path_string in self.field_sets:
del self.field_sets[path_string]
if (
hasattr(root, field_name)
and field_name not in self.field_sets.get(
parent_path_string, {field_name})
parent_path_string not in self.field_sets
and hasattr(root, 'ListFields')
):
return None
if (
info.operation.operation in self.ASYNC_OPS
or iscoroutinefunction(next)
):
return self.async_null_setter(next, root, info, **args)
return null_setter(next(root, info, **args))
self.field_sets[parent_path_string] = set(
field.name
for field, _ in root.ListFields()
)

# Needed for child fields that resolve without args.
# Store arguments of parents as leaves of schema tree from path
# to respective field.
if STRIP_ARG in args:
# no need to regrow the tree on every subscription push/delta
if path_string not in self.tree_paths:
grow_tree(self.args_tree, info.path, args)
self.tree_paths.add(path_string)
else:
branch = self.args_tree
for section in info.path:
branch = branch.get(section, {})
if not branch:
break
# Only set if present on branch section
if STRIP_ARG in branch:
args[STRIP_ARG] = branch[STRIP_ARG]

# Now flag empty fields as 'null for stripping
if args.get(STRIP_ARG, False):
if (
hasattr(root, field_name)
and get_type_str(info.return_type) not in NODE_MAP
and field_name not in self.field_sets.get(
parent_path_string, {field_name})
):
return None
if (
info.operation.operation in self.ASYNC_OPS
or iscoroutinefunction(next)
):
return self.async_null_setter(next, root, info, **args)
return null_setter(next(root, info, **args))

if (
info.operation.operation in self.ASYNC_OPS
Expand Down
Loading

0 comments on commit 92fdab2

Please sign in to comment.