Skip to content

Commit

Permalink
Cylc GraphQL backend & grouped deltas sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Feb 24, 2020
1 parent 1d35ff4 commit 32a4d57
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 33 deletions.
46 changes: 26 additions & 20 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.data_store_mgr import (
EDGES, FAMILIES, FAMILY_PROXIES, JOBS, TASKS, TASK_PROXIES, WORKFLOW,
DELTAS_MAP, apply_delta, generate_checksum
ALL_DELTAS, DELTAS_MAP, apply_delta, generate_checksum
)
from .workflows_mgr import workflow_request

Expand All @@ -62,12 +62,12 @@ def __init__(self, workflows_mgr):
self.workflows_mgr = workflows_mgr
self.data = {}
self.w_subs = {}
self.topics = {topic.encode('utf-8') for topic in DELTAS_MAP}
self.topics.add(b'shutdown')
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor = ThreadPoolExecutor()
self.delta_queues = {}

async def sync_workflow(self, w_id, *args, **kwargs):
"""Run data store sync with workflow services.
Expand All @@ -81,6 +81,7 @@ async def sync_workflow(self, w_id, *args, **kwargs):
self.loop = asyncio.get_running_loop()
if w_id in self.w_subs:
return
self.delta_queues[w_id] = {}
self.executor.submit(
partial(self.start_subscription, w_id, *args, **kwargs)
)
Expand All @@ -93,6 +94,8 @@ def purge_workflow(self, w_id):
del self.w_subs[w_id]
if w_id in self.data:
del self.data[w_id]
if w_id in self.delta_queues:
del self.delta_queues[w_id]

def start_subscription(self, w_id, reg, host, port):
"""Instatiate and run subscriber data-store sync.
Expand Down Expand Up @@ -126,6 +129,8 @@ def update_workflow_data(self, topic, delta, w_id):
w_id (str): Workflow external ID.
"""
for delta_queue in self.delta_queues[w_id].values():
delta_queue.put((w_id, topic, delta))
# wait until data-store is populated for this workflow
if w_id not in self.data:
loop_cnt = 0
Expand All @@ -139,23 +144,24 @@ def update_workflow_data(self, topic, delta, w_id):
self.workflows_mgr.stopping.add(w_id)
self.w_subs[w_id].stop()
return
delta_time = getattr(
delta, 'time', getattr(delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
# otherwise apply the delta if it's newer than the previously applied.
if delta.reloaded:
if topic == WORKFLOW:
self.data[w_id][topic].CopyFrom(delta)
else:
self.data[w_id][topic] = {
ele.id: ele
for ele in delta.deltas
}
self.data[w_id]['delta_times'][topic] = delta_time
elif delta_time >= self.data[w_id]['delta_times'][topic]:
apply_delta(topic, delta, self.data[w_id])
self.data[w_id]['delta_times'][topic] = delta_time
self.reconcile_update(topic, delta, w_id)
for field, sub_delta in delta.ListFields():
delta_time = getattr(
sub_delta, 'time', getattr(sub_delta, 'last_updated', 0.0))
# If the workflow has reloaded recreate the data
# otherwise apply the delta if newer than the previously applied.
if sub_delta.reloaded:
if field.name == WORKFLOW:
self.data[w_id][field.name].CopyFrom(sub_delta)
else:
self.data[w_id][field.name] = {
ele.id: ele
for ele in sub_delta.deltas
}
self.data[w_id]['delta_times'][field.name] = delta_time
elif delta_time >= self.data[w_id]['delta_times'][field.name]:
apply_delta(field.name, sub_delta, self.data[w_id])
self.data[w_id]['delta_times'][field.name] = delta_time
self.reconcile_update(field.name, sub_delta, w_id)

def reconcile_update(self, topic, delta, w_id):
"""Reconcile local with workflow data-store.
Expand Down
1 change: 1 addition & 0 deletions cylc/uiserver/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def initialize(self, schema=None, executor=None, middleware=None,
self.graphiql = graphiql
self.batch = batch
self.backend = backend or get_default_backend()
self.strip_null = True
# Set extra attributes
for key, value in kwargs.items():
if hasattr(self, key):
Expand Down
38 changes: 28 additions & 10 deletions cylc/uiserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

from tornado import web, ioloop

from cylc.flow.network.graphql import (
GraphQLCoreBackend, IgnoreFieldMiddleware
)
from cylc.flow.network.schema import schema
from graphene_tornado.tornado_graphql_handler import TornadoGraphQLHandler
from jupyterhub.services.auth import HubOAuthCallbackHandler
Expand All @@ -40,6 +43,9 @@

logger = logging.getLogger(__name__)

# Avoid re-instantiation
MIDDLEWARE = [IgnoreFieldMiddleware()]


class MyApplication(web.Application):
is_closing = False
Expand Down Expand Up @@ -75,7 +81,7 @@ def __init__(self, port, static, jupyter_hub_service_prefix):
self.workflows_mgr = WorkflowsManager(self)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr)
self.resolvers = Resolvers(
self.data_store_mgr.data,
self.data_store_mgr,
workflows_mgr=self.workflows_mgr)

def _create_static_handler(
Expand Down Expand Up @@ -139,6 +145,8 @@ def _create_graphql_handler(
clazz,
schema=schema,
resolvers=self.resolvers,
backend=GraphQLCoreBackend(),
middleware=MIDDLEWARE,
**kwargs
)

Expand All @@ -150,7 +158,11 @@ def _make_app(self, debug: bool):
"""
logger.info(self._static)
# subscription/websockets server
subscription_server = TornadoSubscriptionServer(schema)
subscription_server = TornadoSubscriptionServer(
schema,
backend=GraphQLCoreBackend(),
middleware=MIDDLEWARE,
)
return MyApplication(
static_path=self._static,
debug=debug,
Expand All @@ -165,14 +177,20 @@ def _make_app(self, debug: bool):
self._create_handler("userprofile",
UserProfileHandler),
# graphql handlers
self._create_graphql_handler("graphql",
UIServerGraphQLHandler),
self._create_graphql_handler("graphql/batch",
UIServerGraphQLHandler,
batch=True),
self._create_graphql_handler("graphql/graphiql",
GraphiQLHandler,
graphiql=True),
self._create_graphql_handler(
"graphql",
UIServerGraphQLHandler,
),
self._create_graphql_handler(
"graphql/batch",
UIServerGraphQLHandler,
batch=True
),
self._create_graphql_handler(
"graphql/graphiql",
GraphiQLHandler,
graphiql=True
),
# subscription/websockets handler
self._create_handler("subscriptions",
SubscriptionHandler,
Expand Down
28 changes: 25 additions & 3 deletions cylc/uiserver/websockets/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# The file was copied from this revision:
# https://github.com/graphql-python/graphql-ws/blob/cf560b9a5d18d4a3908dc2cfe2199766cc988fef/graphql_ws/tornado.py

from inspect import isawaitable
from inspect import isawaitable, isclass

from asyncio import ensure_future, gather, wait, shield
from tornado.websocket import WebSocketClosedError
from graphql.execution.executors.asyncio import AsyncioExecutor
from graphql.execution.middleware import MiddlewareManager
from graphql_ws.base import ConnectionClosedException, BaseConnectionContext, BaseSubscriptionServer
from graphql_ws.observable_aiter import setup_observable_extension
from graphql_ws.constants import (
Expand Down Expand Up @@ -44,14 +45,35 @@ async def close(self, code):


class TornadoSubscriptionServer(BaseSubscriptionServer):
def __init__(self, schema, keep_alive=True, loop=None):
def __init__(self, schema, keep_alive=True, loop=None, backend=None, middleware=None):
self.loop = loop
self.backend = backend or None
if middleware is not None:
self.middleware = MiddlewareManager(*middleware, wrap_in_promise=False)
else:
self.middleware = None
self.strip_null = True
super().__init__(schema, keep_alive)

@staticmethod
def instantiate_middleware(middlewares):
for middleware in middlewares:
if isclass(middleware):
yield middleware()
continue
yield middleware

def get_graphql_params(self, *args, **kwargs):
params = super(TornadoSubscriptionServer,
self).get_graphql_params(*args, **kwargs)
return dict(params, return_promise=True, executor=AsyncioExecutor(loop=self.loop))
return dict(
params,
return_promise=True,
executor=AsyncioExecutor(loop=self.loop),
backend=self.backend,
middleware=self.middleware,
strip_null=self.strip_null,
)

async def _handle(self, ws, request_context):
connection_context = TornadoConnectionContext(ws, request_context)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def find_version(*file_paths):
'graphene-tornado==2.1.*',
('cylc-flow @ https://github.com/cylc/cylc-flow'
'/tarball/master#egg=cylc-8.0a2.dev'),
# 'cylc-flow',
'graphql-core<3,>=2.1', # TODO: graphql-python/graphql-ws#39
'graphql-ws==0.3.*'
]
Expand Down

0 comments on commit 32a4d57

Please sign in to comment.