From bb93f73c379bc73096b4530da16a63e851e359d4 Mon Sep 17 00:00:00 2001 From: b-ma Date: Thu, 25 Apr 2024 11:41:26 +0200 Subject: [PATCH] feat(shared-state): user defined timeout for batched transport --- src/client/Client.js | 4 +- src/common/BaseStateManager.js | 51 +++++++-- src/common/BatchedTransport.js | 26 ++++- src/server/Server.js | 6 ++ src/server/StateManager.js | 19 ++-- tests/states/SharedState.spec.js | 172 +++++++++++++++++++++++++------ 6 files changed, 220 insertions(+), 58 deletions(-) diff --git a/src/client/Client.js b/src/client/Client.js index 296e253c..c1b705f2 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -189,7 +189,7 @@ class Client { * @see {@link client.StateManager} * @type {client.StateManager} */ - this.stateManager = null; + this.stateManager = new StateManager(); /** * Status of the client, 'idle', 'inited', 'started' or 'errored'. @@ -288,7 +288,7 @@ class Client { // ------------------------------------------------------------ // CREATE STATE MANAGER // ------------------------------------------------------------ - this.stateManager = new StateManager(this.id, { + this.stateManager.init(this.id, { emit: this.socket.send.bind(this.socket), // need to alias this addListener: this.socket.addListener.bind(this.socket), removeAllListeners: this.socket.removeAllListeners.bind(this.socket), diff --git a/src/common/BaseStateManager.js b/src/common/BaseStateManager.js index 2fe941a8..fb6525f2 100644 --- a/src/common/BaseStateManager.js +++ b/src/common/BaseStateManager.js @@ -26,15 +26,7 @@ import { * @private */ class BaseStateManager { - /** - * @param {Number} id - Id of the node. - * @param {Object} transport - Transport to use for synchronizing the state. - * Must implement a basic EventEmitter API. - */ - constructor(id, transport) { - // proxy transport with BatchedTransport; - this.client = { id, transport: new BatchedTransport(transport) }; - + constructor() { this._statesById = new Map(); // this._cachedSchemas = new Map(); // @@ -43,6 +35,47 @@ class BaseStateManager { this._promiseStore = new PromiseStore(); + this._options = { + transportBatchTimeout: 0, + }; + } + + /** + * Configure + * + */ + configure(options) { + if (!(typeof options === 'object') || options === null) { + throw new TypeError(`Cannot execute 'configure' on 'BaseStateManager': given option is not a valid options object`); + } + + if (options.transportBatchTimeout !== undefined) { + if (!Number.isFinite(options.transportBatchTimeout) || options.transportBatchTimeout < 0) { + throw new TypeError(`Failed to execute 'configure' on 'BaseStateManager': The provided option 'transportBatchTimeout' must be equal to or greater than 0`); + } + + if (options.transportBatchTimeout !== 0 && options.transportBatchTimeout < 1) { + options.transportBatchTimeout = 1; + console.warn(`Warning: Executing 'configure' on 'BaseStateManager': The provided option 'transportBatchTimeout' has been clamped to 1, make sure the given 'transportBatchTimeout' is expressed in milliseconds`); + } + + this._options.transportBatchTimeout = options.transportBatchTimeout; + } + } + + /** + * Executed on `client.init` + * @param {Number} id - Id of the node. + * @param {Object} transport - Transport to use for synchronizing the states. + * Must implement a basic EventEmitter API. + */ + init(id, transport) { + const batchedTransport = new BatchedTransport(transport, { + transportBatchTimeout: this._options.transportBatchTimeout, + }); + + this.client = { id, transport: batchedTransport }; + // --------------------------------------------- // CREATE // --------------------------------------------- diff --git a/src/common/BatchedTransport.js b/src/common/BatchedTransport.js index 5cd48cd2..1ab943c4 100644 --- a/src/common/BatchedTransport.js +++ b/src/common/BatchedTransport.js @@ -1,14 +1,27 @@ +import { delay } from '@ircam/sc-utils'; + import { BATCHED_TRANSPORT_CHANNEL } from './constants.js'; /** * This class proxies transports given the SharedStateManager to batch messages + * + * @param {number} [options.wait=0] - Wait for given number of milliseconds + * for stacking messages before emitting on the network. If 0 is given, network + * message is emitted on next microtask * @private */ class BatchedTransport { - constructor(transport) { + constructor(transport, { + transportBatchTimeout = 0, // in ms + } = {}) { + if (!Number.isFinite(transportBatchTimeout) || transportBatchTimeout < 0) { + throw new TypeError(`Failed to construct BatchedTransport: The provided option 'transportBatchTimeout' must equal to or greater than 0`); + } + this._transport = transport; - this._stack = []; this._listeners = new Map(); + this._stack = []; + this._transportBatchTimeout = transportBatchTimeout; this._sending = false; this._transport.addListener(BATCHED_TRANSPORT_CHANNEL, stack => { @@ -39,7 +52,14 @@ class BatchedTransport { if (!this._sending) { this._sending = true; - this._sending = await false; + + if (this._transportBatchTimeout === 0) { + await false; + } else { + await delay(this._transportBatchTimeout); + } + + this._sending = false; const stack = this._stack; this._stack = []; this._transport.emit(BATCHED_TRANSPORT_CHANNEL, stack); diff --git a/src/server/Server.js b/src/server/Server.js index 4acb16b5..2f31cf84 100644 --- a/src/server/Server.js +++ b/src/server/Server.js @@ -1,3 +1,4 @@ +import EventEmitter from 'node:events'; import fs from 'node:fs'; import http from 'node:http'; import https from 'node:https'; @@ -371,6 +372,11 @@ class Server { * await server.start(); // init is called implicitely */ async init() { + // ------------------------------------------------------------ + // INIT STATE MANAGER + // ------------------------------------------------------------ + this.stateManager.init(SERVER_ID, new EventEmitter()); + const numClients = {}; for (let name in this.config.app.clients) { numClients[name] = 0; diff --git a/src/server/StateManager.js b/src/server/StateManager.js index 48eac99a..1ad7e355 100644 --- a/src/server/StateManager.js +++ b/src/server/StateManager.js @@ -1,5 +1,3 @@ -import EventEmitter from 'node:events'; - import { idGenerator, isString, isPlainObject } from '@ircam/sc-utils'; import clonedeep from 'lodash/cloneDeep.js'; @@ -8,7 +6,6 @@ import BatchedTransport from '../common/BatchedTransport.js'; import ParameterBag from '../common/ParameterBag.js'; import SharedStatePrivate from '../common/SharedStatePrivate.js'; import { - SERVER_ID, CREATE_REQUEST, CREATE_RESPONSE, CREATE_ERROR, @@ -314,19 +311,19 @@ const kIsObservableState = Symbol('StateManager::isObservableState'); */ class StateManager extends BaseStateManager { constructor() { - // acts as a client of itself locally - const localClientId = SERVER_ID; - const localTransport = new EventEmitter(); - - super(localClientId, localTransport); + super(); this._clientByNodeId = new Map(); this._sharedStatePrivateById = new Map(); this._schemas = new Map(); this._observers = new Set(); this._hooksBySchemaName = new Map(); // protected + } - this.addClient(localClientId, localTransport); + init(id, transport) { + super.init(id, transport); + // add itself as client of the state manager server + this.addClient(id, transport); } [kIsObservableState](state) { @@ -348,7 +345,9 @@ class StateManager extends BaseStateManager { * @private */ addClient(nodeId, transport) { - transport = new BatchedTransport(transport); + transport = new BatchedTransport(transport, { + transportBatchTimeout: this._options.transportBatchTimeout, + }); const client = { id: nodeId, transport }; this._clientByNodeId.set(nodeId, client); diff --git a/tests/states/SharedState.spec.js b/tests/states/SharedState.spec.js index c069aa8f..60b839e8 100644 --- a/tests/states/SharedState.spec.js +++ b/tests/states/SharedState.spec.js @@ -524,52 +524,156 @@ describe('# SharedState', () => { } }); }); +}); - describe(`## Batched transport`, () => { - it(`should send only one message on several consecutive update requests`, async () => { - // launch new server so we can grab the server side representation of the client - const localConfig = structuredClone(config); - localConfig.env.port = 8082; +describe(`# SharedState - Batched transport`, () => { + it(`wait = 0 - should send only one message on consecutive synchronous update requests`, async () => { + // launch new server so we can grab the server side representation of the client + // @note to self - please explain... + const localConfig = structuredClone(config); + localConfig.env.port = 8082; - const server = new Server(localConfig); - server.stateManager.registerSchema('a', a); - await server.start(); + const server = new Server(localConfig); + server.stateManager.registerSchema('a', a); + await server.start(); - let batchedRequests = 0; - let batchedResponses = 0; + let batchedRequests = 0; + let batchedResponses = 0; - server.onClientConnect(client => { - client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { - batchedRequests += 1; - }); + server.onClientConnect(client => { + client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { + batchedRequests += 1; }); + }); + + const client = new Client({ role: 'test', ...localConfig }); + await client.start(); + + // update response + client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { + batchedResponses += 1; + }); + + const state = await client.stateManager.create('a'); + + state.set({ bool: true }); + + for (let i = 1; i <= 42; i++) { + state.set({ int: i }); + } + + await delay(20); + // make sure the state is up to date + assert.equal(state.get('int'), 42); + // 1 message for create request / response (i.e.await client.stateManager.create) + // 1 message for the batched updates requests / responses + assert.equal(batchedRequests, 2); + assert.equal(batchedResponses, 2); + + state.delete(); + await client.stop(); + await server.stop(); + }); + + it(`transportBatchTimeout > 0 - server should send only one message on consecutive asynchronous update requests`, async () => { + // launch new server so we can grab the server side representation of the client + // @note to self - please explain... + const localConfig = structuredClone(config); + localConfig.env.port = 8082; + + const server = new Server(localConfig); + server.stateManager.configure({ transportBatchTimeout: 20 }); + server.stateManager.registerSchema('a', a); + await server.start(); - const client = new Client({ role: 'test', ...localConfig }); - await client.start(); + let batchedRequests = 0; + let batchedResponses = 0; - // update response + server.onClientConnect(client => { client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { - batchedResponses += 1; + batchedRequests += 1; }); + }); - const state = await client.stateManager.create('a'); + const client = new Client({ role: 'test', ...localConfig }); + await client.start(); - state.set({ bool: true }); - for (let i = 1; i <= 42; i++) { - state.set({ int: i }); - } + // update response + client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { + batchedResponses += 1; + }); - await delay(20); - // make sure the state is up to date - assert.equal(state.get('int'), 42); - // 1 message for create request / response (i.e.await client.stateManager.create) - // 1 message for the batched updates requests / responses - assert.equal(batchedRequests, 2); - assert.equal(batchedResponses, 2); - - state.delete(); - await client.stop(); - await server.stop(); + const state = await client.stateManager.create('a'); + + for (let i = 1; i <= 10; i++) { + await delay(1); + state.set({ int: i }); + } + + await delay(30); + // make sure the state is up to date + assert.equal(state.get('int'), 10); + // 1 message for create request / response (i.e.await client.stateManager.create) + // 10 message for the updates requests + assert.equal(batchedRequests, 11); + // 1 message for create request / response (i.e.await client.stateManager.create) + // 1 message for the updates responses + assert.equal(batchedResponses, 2); + + state.delete(); + await client.stop(); + await server.stop(); + }); + + it(`transportBatchTimeout > 0 - client should send only one message on consecutive asynchronous update requests`, async () => { + // launch new server so we can grab the server side representation of the client + // @note to self - please explain... + const localConfig = structuredClone(config); + localConfig.env.port = 8082; + + const server = new Server(localConfig); + server.stateManager.registerSchema('a', a); + await server.start(); + + let batchedRequests = 0; + let batchedResponses = 0; + + server.onClientConnect(client => { + client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { + // console.log('requests', args); + batchedRequests += 1; + }); }); + + const client = new Client({ role: 'test', ...localConfig }); + client.stateManager.configure({ transportBatchTimeout: 20 }); + await client.start(); + + // update response + client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => { + // console.log('responses', args); + batchedResponses += 1; + }); + + const state = await client.stateManager.create('a'); + + for (let i = 1; i <= 10; i++) { + await delay(1); + state.set({ int: i }); + } + + await delay(30); + // make sure the state is up to date + assert.equal(state.get('int'), 10); + // 1 message for create request / response (i.e.await client.stateManager.create) + // 1 message for the updates requests + assert.equal(batchedRequests, 2); + // 1 message for create request / response (i.e.await client.stateManager.create) + // only 1 message for the updates responses as they are handled in a batch by the server + assert.equal(batchedResponses, 2); + + state.delete(); + await client.stop(); + await server.stop(); }); });