From 504311cc12ea5dcc8b242fab853270c8cc84bbf0 Mon Sep 17 00:00:00 2001 From: TS Date: Fri, 30 Apr 2021 22:58:11 +0100 Subject: [PATCH 1/2] fix(#8): fix topic matching in topic store --- package.json | 2 +- src/util/WsStore.ts | 42 ++++++++++++++++++++++++++++++++++------- src/websocket-client.ts | 27 ++++++++++++++++---------- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/package.json b/package.json index 0dc7c17..523e19a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ftx-api", - "version": "1.0.4", + "version": "1.0.5-beta.2", "description": "Node.js connector for FTX's REST APIs and WebSockets", "main": "lib/index.js", "types": "lib/index.d.ts", diff --git a/src/util/WsStore.ts b/src/util/WsStore.ts index 2197086..0914866 100644 --- a/src/util/WsStore.ts +++ b/src/util/WsStore.ts @@ -1,4 +1,4 @@ -import { WsConnectionState, WsTopic } from '../websocket-client'; +import { WsChannel, WsConnectionState, WsTopic } from '../websocket-client'; import { DefaultLogger } from '../logger'; import WebSocket from 'isomorphic-ws'; @@ -16,6 +16,15 @@ interface WsStoredState { subscribedTopics: WsTopicList; }; +function isDeepObjectMatch(object1: any, object2: any) { + for (const key in object1) { + if (object1[key] !== object2[key]) { + return false; + } + } + return true; +} + export default class WsStore { private wsState: { [key: string]: WsStoredState; @@ -65,7 +74,7 @@ export default class WsStore { /* connection websocket */ - hasExistingActiveConnection(key) { + hasExistingActiveConnection(key: string) { return this.get(key) && this.isWsOpen(key); } @@ -114,17 +123,36 @@ export default class WsStore { return result; } - addTopic(key: string, topic: WsTopic | string) { + // Since topics are objects we can't rely on the set to detect duplicates + getMatchingTopic(key: string, topic: WsTopic | WsChannel) { if (typeof topic === 'string') { - return this.addTopic(key, { channel: topic }); + return this.getMatchingTopic(key, { channel: topic }); + } + + const allTopics = this.getTopics(key).values(); + for (const storedTopic of allTopics) { + if (isDeepObjectMatch(topic, storedTopic)) { + return storedTopic; + } } - return this.getTopics(key).add(topic); } - deleteTopic(key: string, topic: WsTopic | string) { + addTopic(key: string, topic: WsTopic | WsChannel) { if (typeof topic === 'string') { return this.addTopic(key, { channel: topic }); } - return this.getTopics(key).delete(topic); + if (this.getMatchingTopic(key, topic)) { + return this.getTopics(key); + } + return this.getTopics(key).add(topic); + } + + deleteTopic(key: string, topic: WsTopic | WsChannel) { + const storedTopic = this.getMatchingTopic(key, topic); + if (storedTopic) { + this.getTopics(key).delete(storedTopic); + } + + return this.getTopics(key); } } \ No newline at end of file diff --git a/src/websocket-client.ts b/src/websocket-client.ts index 3249f2e..a8aa76f 100644 --- a/src/websocket-client.ts +++ b/src/websocket-client.ts @@ -1,7 +1,7 @@ import { EventEmitter } from 'events'; import { RestClient } from './rest-client'; import { DefaultLogger } from './logger'; -import { signMessage, serializeParams, signWsAuthenticate, WSClientConfigurableOptions, getWsUrl, WebsocketClientOptions } from './util/requestUtils'; +import { signMessage, signWsAuthenticate, WSClientConfigurableOptions, getWsUrl, WebsocketClientOptions } from './util/requestUtils'; import WebSocket from 'isomorphic-ws'; import WsStore from './util/WsStore'; @@ -31,8 +31,9 @@ export declare interface WebsocketClient { on(event: 'reconnect' | 'close', listener: () => void): this; }; +export type WsChannel = 'orderbook' | 'orderbookGrouped' | 'markets' | 'trades' | 'ticker' | 'fills' | 'orders' | string; export interface WsTopic { - channel: string; + channel: WsChannel; grouping?: number; market?: string; }; @@ -73,8 +74,12 @@ export class WebsocketClient extends EventEmitter { /** * Add topic/topics to WS subscription list */ - public subscribe(wsTopics: WsTopic[] | WsTopic | string[] | string) { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + public subscribe(wsTopics: WsTopic[] | WsTopic | WsChannel[] | WsChannel) { + const mixedTopics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + const topics = mixedTopics.map(topic => { + return typeof topic === 'string' ? { channel: topic } : topic; + }); + topics.forEach(topic => this.wsStore.addTopic( this.getWsKeyForTopic(topic), topic @@ -84,7 +89,7 @@ export class WebsocketClient extends EventEmitter { this.wsStore.getKeys().forEach(wsKey => { // if connected, send subscription request if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { - return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]); + return this.requestSubscribeTopics(wsKey, topics); } // start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect @@ -100,8 +105,12 @@ export class WebsocketClient extends EventEmitter { /** * Remove topic/topics from WS subscription list */ - public unsubscribe(wsTopics: WsTopic[] | WsTopic | string[] | string) { - const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + public unsubscribe(wsTopics: WsTopic[] | WsTopic | WsChannel[] | WsChannel) { + const mixedTopics = Array.isArray(wsTopics) ? wsTopics : [wsTopics]; + const topics = mixedTopics.map(topic => { + return typeof topic === 'string' ? { channel: topic } : topic; + }); + topics.forEach(topic => this.wsStore.deleteTopic( this.getWsKeyForTopic(topic), topic @@ -110,7 +119,7 @@ export class WebsocketClient extends EventEmitter { this.wsStore.getKeys().forEach(wsKey => { // unsubscribe request only necessary if active connection exists if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) { - this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]) + this.requestUnsubscribeTopics(wsKey, topics) } }); } @@ -272,8 +281,6 @@ export class WebsocketClient extends EventEmitter { * Send WS message to subscribe to topics. */ private requestSubscribeTopics(wsKey: string, topics: WsTopic[]) { - const market = ''; - topics.forEach(topic => { const wsMessage = JSON.stringify({ op: 'subscribe', From d36815779d1da184ebc590f98e359b8af10a42d3 Mon Sep 17 00:00:00 2001 From: TS Date: Tue, 4 May 2021 23:04:42 +0100 Subject: [PATCH 2/2] v1.0.6: fix ws unsubscribe workflows --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 523e19a..48f1ec3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ftx-api", - "version": "1.0.5-beta.2", + "version": "1.0.6", "description": "Node.js connector for FTX's REST APIs and WebSockets", "main": "lib/index.js", "types": "lib/index.d.ts",