Skip to content

Commit

Permalink
feat(shared-state): user defined timeout for batched transport
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Apr 25, 2024
1 parent 19fff13 commit bb93f73
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/client/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class Client {
* @see {@link client.StateManager}
* @type {client.StateManager}
*/
this.stateManager = null;
this.stateManager = new StateManager();

/**
* Status of the client, 'idle', 'inited', 'started' or 'errored'.
Expand Down Expand Up @@ -288,7 +288,7 @@ class Client {
// ------------------------------------------------------------
// CREATE STATE MANAGER
// ------------------------------------------------------------
this.stateManager = new StateManager(this.id, {
this.stateManager.init(this.id, {
emit: this.socket.send.bind(this.socket), // need to alias this
addListener: this.socket.addListener.bind(this.socket),
removeAllListeners: this.socket.removeAllListeners.bind(this.socket),
Expand Down
51 changes: 42 additions & 9 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,7 @@ import {
* @private
*/
class BaseStateManager {
/**
* @param {Number} id - Id of the node.
* @param {Object} transport - Transport to use for synchronizing the state.
* Must implement a basic EventEmitter API.
*/
constructor(id, transport) {
// proxy transport with BatchedTransport;
this.client = { id, transport: new BatchedTransport(transport) };

constructor() {
this._statesById = new Map(); // <id, state>
this._cachedSchemas = new Map(); // <shemaName, definition>

Expand All @@ -43,6 +35,47 @@ class BaseStateManager {

this._promiseStore = new PromiseStore();

this._options = {
transportBatchTimeout: 0,
};
}

/**
* Configure
*
*/
configure(options) {
if (!(typeof options === 'object') || options === null) {
throw new TypeError(`Cannot execute 'configure' on 'BaseStateManager': given option is not a valid options object`);
}

if (options.transportBatchTimeout !== undefined) {
if (!Number.isFinite(options.transportBatchTimeout) || options.transportBatchTimeout < 0) {
throw new TypeError(`Failed to execute 'configure' on 'BaseStateManager': The provided option 'transportBatchTimeout' must be equal to or greater than 0`);
}

if (options.transportBatchTimeout !== 0 && options.transportBatchTimeout < 1) {
options.transportBatchTimeout = 1;
console.warn(`Warning: Executing 'configure' on 'BaseStateManager': The provided option 'transportBatchTimeout' has been clamped to 1, make sure the given 'transportBatchTimeout' is expressed in milliseconds`);
}

this._options.transportBatchTimeout = options.transportBatchTimeout;
}
}

/**
* Executed on `client.init`
* @param {Number} id - Id of the node.
* @param {Object} transport - Transport to use for synchronizing the states.
* Must implement a basic EventEmitter API.
*/
init(id, transport) {
const batchedTransport = new BatchedTransport(transport, {
transportBatchTimeout: this._options.transportBatchTimeout,
});

this.client = { id, transport: batchedTransport };

// ---------------------------------------------
// CREATE
// ---------------------------------------------
Expand Down
26 changes: 23 additions & 3 deletions src/common/BatchedTransport.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
import { delay } from '@ircam/sc-utils';

import { BATCHED_TRANSPORT_CHANNEL } from './constants.js';

/**
* This class proxies transports given the SharedStateManager to batch messages
*
* @param {number} [options.wait=0] - Wait for given number of milliseconds
* for stacking messages before emitting on the network. If 0 is given, network
* message is emitted on next microtask
* @private
*/
class BatchedTransport {
constructor(transport) {
constructor(transport, {
transportBatchTimeout = 0, // in ms
} = {}) {
if (!Number.isFinite(transportBatchTimeout) || transportBatchTimeout < 0) {
throw new TypeError(`Failed to construct BatchedTransport: The provided option 'transportBatchTimeout' must equal to or greater than 0`);
}

this._transport = transport;
this._stack = [];
this._listeners = new Map();
this._stack = [];
this._transportBatchTimeout = transportBatchTimeout;
this._sending = false;

this._transport.addListener(BATCHED_TRANSPORT_CHANNEL, stack => {
Expand Down Expand Up @@ -39,7 +52,14 @@ class BatchedTransport {

if (!this._sending) {
this._sending = true;
this._sending = await false;

if (this._transportBatchTimeout === 0) {
await false;
} else {
await delay(this._transportBatchTimeout);
}

this._sending = false;
const stack = this._stack;
this._stack = [];
this._transport.emit(BATCHED_TRANSPORT_CHANNEL, stack);
Expand Down
6 changes: 6 additions & 0 deletions src/server/Server.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import EventEmitter from 'node:events';
import fs from 'node:fs';
import http from 'node:http';
import https from 'node:https';
Expand Down Expand Up @@ -371,6 +372,11 @@ class Server {
* await server.start(); // init is called implicitely
*/
async init() {
// ------------------------------------------------------------
// INIT STATE MANAGER
// ------------------------------------------------------------
this.stateManager.init(SERVER_ID, new EventEmitter());

const numClients = {};
for (let name in this.config.app.clients) {
numClients[name] = 0;
Expand Down
19 changes: 9 additions & 10 deletions src/server/StateManager.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import EventEmitter from 'node:events';

import { idGenerator, isString, isPlainObject } from '@ircam/sc-utils';
import clonedeep from 'lodash/cloneDeep.js';

Expand All @@ -8,7 +6,6 @@ import BatchedTransport from '../common/BatchedTransport.js';
import ParameterBag from '../common/ParameterBag.js';
import SharedStatePrivate from '../common/SharedStatePrivate.js';
import {
SERVER_ID,
CREATE_REQUEST,
CREATE_RESPONSE,
CREATE_ERROR,
Expand Down Expand Up @@ -314,19 +311,19 @@ const kIsObservableState = Symbol('StateManager::isObservableState');
*/
class StateManager extends BaseStateManager {
constructor() {
// acts as a client of itself locally
const localClientId = SERVER_ID;
const localTransport = new EventEmitter();

super(localClientId, localTransport);
super();

this._clientByNodeId = new Map();
this._sharedStatePrivateById = new Map();
this._schemas = new Map();
this._observers = new Set();
this._hooksBySchemaName = new Map(); // protected
}

this.addClient(localClientId, localTransport);
init(id, transport) {
super.init(id, transport);
// add itself as client of the state manager server
this.addClient(id, transport);
}

[kIsObservableState](state) {
Expand All @@ -348,7 +345,9 @@ class StateManager extends BaseStateManager {
* @private
*/
addClient(nodeId, transport) {
transport = new BatchedTransport(transport);
transport = new BatchedTransport(transport, {
transportBatchTimeout: this._options.transportBatchTimeout,
});

const client = { id: nodeId, transport };
this._clientByNodeId.set(nodeId, client);
Expand Down
172 changes: 138 additions & 34 deletions tests/states/SharedState.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -524,52 +524,156 @@ describe('# SharedState', () => {
}
});
});
});

describe(`## Batched transport`, () => {
it(`should send only one message on several consecutive update requests`, async () => {
// launch new server so we can grab the server side representation of the client
const localConfig = structuredClone(config);
localConfig.env.port = 8082;
describe(`# SharedState - Batched transport`, () => {
it(`wait = 0 - should send only one message on consecutive synchronous update requests`, async () => {
// launch new server so we can grab the server side representation of the client
// @note to self - please explain...
const localConfig = structuredClone(config);
localConfig.env.port = 8082;

const server = new Server(localConfig);
server.stateManager.registerSchema('a', a);
await server.start();
const server = new Server(localConfig);
server.stateManager.registerSchema('a', a);
await server.start();

let batchedRequests = 0;
let batchedResponses = 0;
let batchedRequests = 0;
let batchedResponses = 0;

server.onClientConnect(client => {
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedRequests += 1;
});
server.onClientConnect(client => {
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedRequests += 1;
});
});

const client = new Client({ role: 'test', ...localConfig });
await client.start();

// update response
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedResponses += 1;
});

const state = await client.stateManager.create('a');

state.set({ bool: true });

for (let i = 1; i <= 42; i++) {
state.set({ int: i });
}

await delay(20);
// make sure the state is up to date
assert.equal(state.get('int'), 42);
// 1 message for create request / response (i.e.await client.stateManager.create)
// 1 message for the batched updates requests / responses
assert.equal(batchedRequests, 2);
assert.equal(batchedResponses, 2);

state.delete();
await client.stop();
await server.stop();
});

it(`transportBatchTimeout > 0 - server should send only one message on consecutive asynchronous update requests`, async () => {
// launch new server so we can grab the server side representation of the client
// @note to self - please explain...
const localConfig = structuredClone(config);
localConfig.env.port = 8082;

const server = new Server(localConfig);
server.stateManager.configure({ transportBatchTimeout: 20 });
server.stateManager.registerSchema('a', a);
await server.start();

const client = new Client({ role: 'test', ...localConfig });
await client.start();
let batchedRequests = 0;
let batchedResponses = 0;

// update response
server.onClientConnect(client => {
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedResponses += 1;
batchedRequests += 1;
});
});

const state = await client.stateManager.create('a');
const client = new Client({ role: 'test', ...localConfig });
await client.start();

state.set({ bool: true });
for (let i = 1; i <= 42; i++) {
state.set({ int: i });
}
// update response
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
batchedResponses += 1;
});

await delay(20);
// make sure the state is up to date
assert.equal(state.get('int'), 42);
// 1 message for create request / response (i.e.await client.stateManager.create)
// 1 message for the batched updates requests / responses
assert.equal(batchedRequests, 2);
assert.equal(batchedResponses, 2);

state.delete();
await client.stop();
await server.stop();
const state = await client.stateManager.create('a');

for (let i = 1; i <= 10; i++) {
await delay(1);
state.set({ int: i });
}

await delay(30);
// make sure the state is up to date
assert.equal(state.get('int'), 10);
// 1 message for create request / response (i.e.await client.stateManager.create)
// 10 message for the updates requests
assert.equal(batchedRequests, 11);
// 1 message for create request / response (i.e.await client.stateManager.create)
// 1 message for the updates responses
assert.equal(batchedResponses, 2);

state.delete();
await client.stop();
await server.stop();
});

it(`transportBatchTimeout > 0 - client should send only one message on consecutive asynchronous update requests`, async () => {
// launch new server so we can grab the server side representation of the client
// @note to self - please explain...
const localConfig = structuredClone(config);
localConfig.env.port = 8082;

const server = new Server(localConfig);
server.stateManager.registerSchema('a', a);
await server.start();

let batchedRequests = 0;
let batchedResponses = 0;

server.onClientConnect(client => {
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
// console.log('requests', args);
batchedRequests += 1;
});
});

const client = new Client({ role: 'test', ...localConfig });
client.stateManager.configure({ transportBatchTimeout: 20 });
await client.start();

// update response
client.socket.addListener(BATCHED_TRANSPORT_CHANNEL, (args) => {
// console.log('responses', args);
batchedResponses += 1;
});

const state = await client.stateManager.create('a');

for (let i = 1; i <= 10; i++) {
await delay(1);
state.set({ int: i });
}

await delay(30);
// make sure the state is up to date
assert.equal(state.get('int'), 10);
// 1 message for create request / response (i.e.await client.stateManager.create)
// 1 message for the updates requests
assert.equal(batchedRequests, 2);
// 1 message for create request / response (i.e.await client.stateManager.create)
// only 1 message for the updates responses as they are handled in a batch by the server
assert.equal(batchedResponses, 2);

state.delete();
await client.stop();
await server.stop();
});
});

0 comments on commit bb93f73

Please sign in to comment.