Skip to content

Commit

Permalink
Cylc GraphQL backend & grouped deltas
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Feb 18, 2020
1 parent 6b769ca commit 836d31c
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 33 deletions.
10 changes: 10 additions & 0 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,13 @@ message TPDeltas {
repeated PbTaskProxy deltas = 4;
bool reloaded = 5;
}

message AllDeltas {
PbWorkflow workflow = 1;
EDeltas edges = 2;
FDeltas families = 3;
FPDeltas family_proxies = 4;
JDeltas jobs = 5;
TDeltas tasks = 6;
TPDeltas task_proxies = 7;
}
90 changes: 89 additions & 1 deletion cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

23 changes: 17 additions & 6 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy,
PbJob, PbTask, PbTaskProxy, PbWorkflow,
EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas)
AllDeltas, EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas)
from cylc.flow.network import API
from cylc.flow.suite_status import get_suite_status
from cylc.flow.task_id import TaskID
Expand All @@ -78,6 +78,7 @@
TASKS = 'tasks'
TASK_PROXIES = 'task_proxies'
WORKFLOW = 'workflow'
ALL_DELTAS = 'all'

MESSAGE_MAP = {
EDGES: PbEdge,
Expand All @@ -97,6 +98,7 @@
TASKS: TDeltas,
TASK_PROXIES: TPDeltas,
WORKFLOW: PbWorkflow,
ALL_DELTAS: AllDeltas,
}

# Protobuf message merging appends repeated field results on merge,
Expand Down Expand Up @@ -223,6 +225,7 @@ class DataStoreMgr:
'cycle_states',
'data',
'deltas',
'delta_queues',
'descendants',
'edge_points',
'max_point',
Expand Down Expand Up @@ -276,6 +279,7 @@ def __init__(self, schd):
TASK_PROXIES: {},
}
self.updates_pending = False
self.delta_queues = {self.workflow_id: {}}

def initiate_data_model(self, reloaded=False):
"""Initiate or Update data model on start/restart/reload.
Expand Down Expand Up @@ -1047,11 +1051,18 @@ def get_entire_workflow(self):

def get_publish_deltas(self):
"""Return deltas for publishing."""
return [
(key.encode('utf-8'), delta, 'SerializeToString')
for key, delta in self.deltas.items()
if delta.ListFields()
]
all_deltas = DELTAS_MAP[ALL_DELTAS]()
result = []
for key, delta in self.deltas.items():
if delta.ListFields():
result.append(
(key.encode('utf-8'), delta, 'SerializeToString'))
getattr(all_deltas, key).CopyFrom(delta)
all_deltas.workflow.id = self.workflow_id
result.append(
(ALL_DELTAS.encode('utf-8'), all_deltas, 'SerializeToString')
)
return result

def get_data_elements(self, element_type):
"""Get elements of a given type in the form of a delta.
Expand Down
5 changes: 4 additions & 1 deletion cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@

def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
return json.dumps(message)
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})


def decode_(message):
Expand Down
155 changes: 155 additions & 0 deletions cylc/flow/network/graphql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
A modification of th GraphQL Core backend:
https://github.com/graphql-python/graphql-core
GraphQL Middleware defined here also.
"""
from functools import partial
import logging

from graphql.execution import ExecutionResult, execute
from graphql.language.base import parse, print_ast
from graphql.language import ast
from graphql.validation import validate
from graphql.backend.base import GraphQLBackend, GraphQLDocument
from rx import Observable

logger = logging.getLogger(__name__)


# Is possible to not use middleware and do all the filtering here.
# However, middleware allows for argument of the request doc to set.
def strip_null(data):
"""Recursively strip data structure of nulls."""
if isinstance(data, dict):
return {
key: strip_null(val)
for key, val in data.items()
if val is not None
}
if isinstance(data, list):
return [
strip_null(val)
for val in data
if val is not None
]
return data


def attr_strip_null(result):
"""Work on the attribute/data of ExecutionResult if present."""
if hasattr(result, 'data'):
result.data = strip_null(result.data)
return result
return strip_null(result)


def null_stripper(exe_result):
"""Strip nulls in accordance with type of execution result."""
if isinstance(exe_result, Observable):
return exe_result.map(attr_strip_null)
if not exe_result.errors:
return attr_strip_null(exe_result)
return exe_result


def execute_and_validate(
schema, # type: GraphQLSchema
document_ast, # type: Document
*args, # type: Any
**kwargs # type: Any
):
# type: (...) -> Union[ExecutionResult, Observable]
"""Validate schema, and execute request doc against it."""
do_validation = kwargs.get("validate", True)
if do_validation:
validation_errors = validate(schema, document_ast)
if validation_errors:
return ExecutionResult(errors=validation_errors, invalid=True)

result = execute(schema, document_ast, *args, **kwargs)

if kwargs.get('strip_null', False):
if kwargs.get('return_promise', False):
return result.then(null_stripper)
return null_stripper(result)
return result


class GraphQLCoreBackend(GraphQLBackend):
"""GraphQLCoreBackend will return a document using the default
graphql executor"""

def __init__(self, executor=None):
# type: (Optional[Any]) -> None
self.execute_params = {"executor": executor}

def document_from_string(self, schema, document_string):
# type: (GraphQLSchema, Union[Document, str]) -> GraphQLDocument
"""Parse string and setup request docutment for execution."""
if isinstance(document_string, ast.Document):
document_ast = document_string
document_string = print_ast(document_ast)
else:
if not isinstance(document_string, str):
logger.error("The query must be a string")
document_ast = parse(document_string)
return GraphQLDocument(
schema=schema,
document_string=document_string,
document_ast=document_ast,
execute=partial(
execute_and_validate,
schema,
document_ast,
**self.execute_params
),
)


# -- Middleware --

class IgnoreFieldMiddleware:
"""Set to null/None type undesired field values for stripping."""

ALLOW_TYPES = [0, 0., False]

def resolve(self, next, root, info, **args):
"""Middleware resolver; handles field according to operation."""
if getattr(info.operation.name, 'value', None) == 'IntrospectionQuery':
return next(root, info, **args)
if info.operation.operation == 'query':
return self.async_resolve(next, root, info, **args)
if info.operation.operation == 'subscription':
return self.null_setter(next(root, info, **args))
return next(root, info, **args)

async def async_resolve(self, next, root, info, **args):
"""Set type to null after awaited result if empty/null-like."""
result = await next(root, info, **args)
return self.null_setter(result)

def null_setter(self, result):
"""Set type to null if result is empty/null-like."""
# If result is not empty... could be some other condition.
# excluded False, as could be a flag turned off.
if result or result in self.ALLOW_TYPES:
return result
return None
Loading

0 comments on commit 836d31c

Please sign in to comment.