Skip to content

Commit

Permalink
socket-mode: Handling WS errors during handshake (#2099)
Browse files Browse the repository at this point in the history
  • Loading branch information
filmaj authored Dec 9, 2024
1 parent 1f8e880 commit c389a75
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 23 deletions.
4 changes: 3 additions & 1 deletion packages/socket-mode/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@slack/socket-mode",
"version": "2.0.2",
"version": "2.0.3-rc.1",
"description": "Official library for using the Slack Platform's Socket Mode API",
"author": "Slack Technologies, LLC",
"license": "MIT",
Expand Down Expand Up @@ -58,11 +58,13 @@
"@tsconfig/recommended": "^1.0.7",
"@types/chai": "^4",
"@types/mocha": "^10",
"@types/proxyquire": "^1.3.31",
"@types/sinon": "^17",
"c8": "^10.1.2",
"chai": "^4",
"mocha": "^11",
"nodemon": "^3.1.0",
"proxyquire": "^2.1.3",
"shx": "^0.3.2",
"sinon": "^19",
"source-map-support": "^0.5.21",
Expand Down
48 changes: 45 additions & 3 deletions packages/socket-mode/src/SlackWebSocket.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
import { ConsoleLogger } from '@slack/logger';
import { assert } from 'chai';
import EventEmitter from 'eventemitter3';
import proxyquire from 'proxyquire';
import sinon from 'sinon';

import { SlackWebSocket } from './SlackWebSocket';
proxyquire.noPreserveCache();
import logModule from './logger';

// A slightly spruced up event emitter aiming at mocking out the `ws` library's `WebSocket` class
class WSMock extends EventEmitter {
// biome-ignore lint/suspicious/noExplicitAny: event listeners can accept any args
addEventListener(evt: string, fn: (...args: any[]) => void) {
this.addListener.call(this, evt, fn);
}
}

describe('SlackWebSocket', () => {
const sandbox = sinon.createSandbox();

let SlackWebSocket: typeof import('./SlackWebSocket').SlackWebSocket;
beforeEach(() => {
SlackWebSocket = proxyquire.load('./SlackWebSocket', {
ws: {
WebSocket: WSMock,
},
}).SlackWebSocket;
});
afterEach(() => {
sandbox.restore();
});
Expand Down Expand Up @@ -38,4 +53,31 @@ describe('SlackWebSocket', () => {
assert.isFalse(logFactory.called);
});
});
describe('WebSocket event handling', () => {
it('should call disconnect() if websocket emits an error', async () => {
// an exposed event emitter pretending its a websocket
const ws = new WSMock();
// mock out the `ws` library and have it return our event emitter mock
SlackWebSocket = proxyquire.load('./SlackWebSocket', {
ws: {
WebSocket: class Fake {
constructor() {
// biome-ignore lint/correctness/noConstructorReturn: for test mocking purposes
return ws;
}
},
},
}).SlackWebSocket;
const sws = new SlackWebSocket({
url: 'whatevs',
client: new EventEmitter(),
clientPingTimeoutMS: 1,
serverPingTimeoutMS: 1,
});
const discStub = sinon.stub(sws, 'disconnect');
sws.connect();
ws.emit('error', { error: new Error('boom') });
sinon.assert.calledOnce(discStub);
});
});
});
15 changes: 7 additions & 8 deletions packages/socket-mode/src/SlackWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,28 @@ export class SlackWebSocket {
agent: this.options.httpAgent,
};

const ws = new WebSocket(this.options.url, options);
this.websocket = new WebSocket(this.options.url, options);

ws.addEventListener('open', (_event) => {
this.websocket.addEventListener('open', (_event) => {
this.logger.debug('WebSocket open event received (connection established)!');
this.websocket = ws;
this.monitorPingToSlack();
});
ws.addEventListener('error', (event) => {
this.websocket.addEventListener('error', (event) => {
this.logger.error(`WebSocket error occurred: ${event.message}`);
this.disconnect();
this.options.client.emit('error', websocketErrorWithOriginal(event.error));
});
ws.on('message', (msg, isBinary) => {
this.websocket.on('message', (msg, isBinary) => {
this.options.client.emit('ws_message', msg, isBinary);
});
ws.on('close', (code: number, data: Buffer) => {
this.websocket.on('close', (code: number, data: Buffer) => {
this.logger.debug(`WebSocket close frame received (code: ${code}, reason: ${data.toString()})`);
this.closeFrameReceived = true;
this.disconnect();
});

// Confirm WebSocket connection is still active
ws.on('ping', (data) => {
this.websocket.on('ping', (data) => {
// Note that ws' `autoPong` option is true by default, so no need to respond to ping.
// see https://github.com/websockets/ws/blob/2aa0405a5e96754b296fef6bd6ebdfb2f11967fc/doc/ws.md#new-websocketaddress-protocols-options
if (this.options.pingPongLoggingEnabled) {
Expand All @@ -142,7 +141,7 @@ export class SlackWebSocket {
this.monitorPingFromSlack();
});

ws.on('pong', (data) => {
this.websocket.on('pong', (data) => {
if (this.options.pingPongLoggingEnabled) {
this.logger.debug(`WebSocket received pong from Slack server (data: ${data.toString()})`);
}
Expand Down
10 changes: 7 additions & 3 deletions packages/socket-mode/src/SocketModeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,13 @@ export class SocketModeClient extends EventEmitter {
this.logger.debug(`Before trying to reconnect, this client will wait for ${msBeforeRetry} milliseconds`);
return new Promise((res, _rej) => {
setTimeout(() => {
this.logger.debug('Continuing with reconnect...');
this.emit(State.Reconnecting);
cb.apply(this).then(res);
if (this.shuttingDown) {
this.logger.debug('Client shutting down, will not attempt reconnect.');
} else {
this.logger.debug('Continuing with reconnect...');
this.emit(State.Reconnecting);
cb.apply(this).then(res);
}
}, msBeforeRetry);
});
}
Expand Down
5 changes: 1 addition & 4 deletions packages/socket-mode/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ function errorWithCode(error: Error, code: ErrorCode): CodedError {
* A factory to create SMWebsocketError objects.
*/
export function websocketErrorWithOriginal(original: Error): SMWebsocketError {
const error = errorWithCode(
new Error(`Failed to send message on websocket: ${original.message}`),
ErrorCode.WebsocketError,
) as Partial<SMWebsocketError>;
const error = errorWithCode(new Error(original.message), ErrorCode.WebsocketError) as Partial<SMWebsocketError>;
error.original = original;
return error as SMWebsocketError;
}
Expand Down
55 changes: 51 additions & 4 deletions packages/socket-mode/test/integration.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ describe('Integration tests with a WebSocket server', () => {
});
});
afterEach(async () => {
server.close();
if (server) server.close();
server = null;
wss.close();
if (wss) wss.close();
wss = null;
exposed_ws_connection = null;
if (client) {
Expand Down Expand Up @@ -90,7 +90,7 @@ describe('Integration tests with a WebSocket server', () => {
await client.disconnect();
});
});
describe('catastrophic server behaviour', () => {
describe('`apps.connections.open` API failure modes', () => {
beforeEach(() => {
client = new SocketModeClient({
appToken: 'whatever',
Expand Down Expand Up @@ -130,7 +130,7 @@ describe('Integration tests with a WebSocket server', () => {
await client.disconnect();
});
});
describe('failure modes / unexpected messages sent to client', () => {
describe('unexpected socket messages sent to client', () => {
const debugLoggerSpy = sinon.stub(); // add the following to expose further logging: .callsFake(console.log);
const noop = () => {};
beforeEach(() => {
Expand All @@ -139,6 +139,7 @@ describe('Integration tests with a WebSocket server', () => {
clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`,
},
logLevel: 'debug',
logger: {
debug: debugLoggerSpy,
info: noop,
Expand Down Expand Up @@ -168,6 +169,52 @@ describe('Integration tests with a WebSocket server', () => {
assert.isTrue(debugLoggerSpy.calledWith(sinon.match('Unable to parse an incoming WebSocket message')));
await client.disconnect();
});
it('should maintain one serial reconnection attempt if WSS server sends unexpected HTTP response during handshake, like a 409', async () => {
// test for https://github.com/slackapi/node-slack-sdk/issues/2094
// override socket mode client instance with lower client ping timeout, which controls reconnection rate
client = new SocketModeClient({
appToken: 'whatever',
clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`,
},
clientPingTimeout: 20, // controls reconnection rate
logLevel: 'debug',
});
// shut down the default mock WS server used in these tests as we will customize its behaviour in this test
wss.close();
wss = null;
// custom HTTP server that blows up during initial WS handshake
const badServer = createServer((_req, res) => {
res.writeHead(409, { 'content-type': 'application/json' });
res.end(
JSON.stringify({
message: 'Unexpected server response: 409',
}),
);
});
badServer.listen(WSS_PORT);
let closed = 0;
// the `close` event is raised every time the websocket server returns an error, so let's keep track of how often this event is emited and use that to infer correct reconnection attempt counts / behaviour
client.on('close', () => {
closed++;
});
// do not use await here, since `start()` won't return until the connection is established. we are intentionally testing connection establishment failure, so that will never finish. so, let's run this in a rogue "thread", e.g. fire off an async method and let it do its thing!
client.start();
await sleep(50);
// after 50ms, with a timeout of 20ms, we would expect 2 retries.
// crucially, the bug reported in https://github.com/slackapi/node-slack-sdk/issues/2094 shows that on every reconnection attempt, we spawn _another_ websocket instance, which attempts to reconnect forever and is never cleaned up.
// effectively: with each reconnection attempt, we double the number of websockets, eventually causing crashes / out-of-memory issues / rate-limiting from Slack APIs.
// with the bug not fixed, this assertion fails as `close` event was emitted 4 times! if we waited another 20ms, we would see this event count double again (8), and so on.
assert.equal(closed, 2, 'unexpected number of times `close` event was raised during reconnection!');
await client.disconnect();
await new Promise((res, rej) => {
// shut down the bad server
badServer.close((err) => {
if (err) rej(err);
else res();
});
});
});
});
describe('lifecycle events', () => {
beforeEach(() => {
Expand Down

0 comments on commit c389a75

Please sign in to comment.