Skip to content

Commit

Permalink
Merge pull request #1231 from coralproject/subscription-hooks
Browse files Browse the repository at this point in the history
Subscription Connection Plugin Hooks
  • Loading branch information
wyattjoh authored Dec 20, 2017
2 parents 998cb8d + 0a79393 commit b2cc4bd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
37 changes: 34 additions & 3 deletions graph/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const debug = require('debug')('talk:graph:subscriptions');
const pubsub = require('../services/pubsub');
const schema = require('./schema');
const Context = require('./context');
const plugins = require('../services/plugins');

const {deserializeUser} = require('../services/subscriptions');
const setupFunctions = require('./setupFunctions');
Expand All @@ -16,16 +17,42 @@ const {

const {BASE_PATH} = require('../url');

const onConnect = ({token}, connection) => {
// Collect all the plugin hooks that should be executed onConnect and
// onDisconnect.
const hooks = plugins.get('server', 'websockets')
.map(({plugin, websockets}) => {
debug(`added websocket hooks ${Object.keys(websockets)} from plugin '${plugin.name}'`);

return websockets;
})
.reduce((hooks, {onConnect = null, onDisconnect = null}) => {
if (onConnect) {
hooks.onConnect.push(onConnect);
}

if (onDisconnect) {
hooks.onDisconnect.push(onDisconnect);
}

return hooks;
}, {
onConnect: [],
onDisconnect: [],
});

const onConnect = async (connectionParams, connection) => {

// Attach the token from the connection options if it was provided.
if (token) {
if (connectionParams.token) {

debug('token sent via onConnect, attaching to the headers of the upgrade request');

// Attach it to the upgrade request.
connection.upgradeReq.headers['authorization'] = `Bearer ${token}`;
connection.upgradeReq.headers['authorization'] = `Bearer ${connectionParams.token}`;
}

// Call all the hooks.
await Promise.all(hooks.onConnect.map((hook) => hook(connectionParams, connection)));
};

const onOperation = (parsedMessage, baseParams, connection) => {
Expand All @@ -52,6 +79,9 @@ const onOperation = (parsedMessage, baseParams, connection) => {
return baseParams;
};

const onDisconnect = (connection) =>
Promise.all(hooks.onDisconnect.map((hook) => hook(connection)));

/**
* This creates a new subscription manager.
*/
Expand All @@ -62,6 +92,7 @@ const createSubscriptionManager = (server) => new SubscriptionServer({
setupFunctions,
}),
onConnect,
onDisconnect,
onOperation,
keepAlive: ms(KEEP_ALIVE)
}, {
Expand Down
4 changes: 4 additions & 0 deletions plugins.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ const hookSchemas = {
resolvers: Joi.object().pattern(/\w/, Joi.object().pattern(/(?:__resolveType|\w+)/, Joi.func())),
typeDefs: Joi.string(),
schemaLevelResolveFunction: Joi.func(),
websockets: Joi.object({
onConnect: Joi.func(),
onDisconnect: Joi.func(),
}),
};

/**
Expand Down

0 comments on commit b2cc4bd

Please sign in to comment.