Skip to content

Commit

Permalink
Merge pull request #9 from tiagosiebler/#8/topicstate
Browse files Browse the repository at this point in the history
Fix topic matching in topic store
  • Loading branch information
tiagosiebler authored May 4, 2021
2 parents 8f9f648 + d368157 commit 4ec5aa3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 18 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ftx-api",
"version": "1.0.4",
"version": "1.0.6",
"description": "Node.js connector for FTX's REST APIs and WebSockets",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
42 changes: 35 additions & 7 deletions src/util/WsStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { WsConnectionState, WsTopic } from '../websocket-client';
import { WsChannel, WsConnectionState, WsTopic } from '../websocket-client';
import { DefaultLogger } from '../logger';

import WebSocket from 'isomorphic-ws';
Expand All @@ -16,6 +16,15 @@ interface WsStoredState {
subscribedTopics: WsTopicList;
};

function isDeepObjectMatch(object1: any, object2: any) {
for (const key in object1) {
if (object1[key] !== object2[key]) {
return false;
}
}
return true;
}

export default class WsStore {
private wsState: {
[key: string]: WsStoredState;
Expand Down Expand Up @@ -65,7 +74,7 @@ export default class WsStore {

/* connection websocket */

hasExistingActiveConnection(key) {
hasExistingActiveConnection(key: string) {
return this.get(key) && this.isWsOpen(key);
}

Expand Down Expand Up @@ -114,17 +123,36 @@ export default class WsStore {
return result;
}

addTopic(key: string, topic: WsTopic | string) {
// Since topics are objects we can't rely on the set to detect duplicates
getMatchingTopic(key: string, topic: WsTopic | WsChannel) {
if (typeof topic === 'string') {
return this.addTopic(key, { channel: topic });
return this.getMatchingTopic(key, { channel: topic });
}

const allTopics = this.getTopics(key).values();
for (const storedTopic of allTopics) {
if (isDeepObjectMatch(topic, storedTopic)) {
return storedTopic;
}
}
return this.getTopics(key).add(topic);
}

deleteTopic(key: string, topic: WsTopic | string) {
addTopic(key: string, topic: WsTopic | WsChannel) {
if (typeof topic === 'string') {
return this.addTopic(key, { channel: topic });
}
return this.getTopics(key).delete(topic);
if (this.getMatchingTopic(key, topic)) {
return this.getTopics(key);
}
return this.getTopics(key).add(topic);
}

deleteTopic(key: string, topic: WsTopic | WsChannel) {
const storedTopic = this.getMatchingTopic(key, topic);
if (storedTopic) {
this.getTopics(key).delete(storedTopic);
}

return this.getTopics(key);
}
}
27 changes: 17 additions & 10 deletions src/websocket-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from 'events';
import { RestClient } from './rest-client';
import { DefaultLogger } from './logger';
import { signMessage, serializeParams, signWsAuthenticate, WSClientConfigurableOptions, getWsUrl, WebsocketClientOptions } from './util/requestUtils';
import { signMessage, signWsAuthenticate, WSClientConfigurableOptions, getWsUrl, WebsocketClientOptions } from './util/requestUtils';

import WebSocket from 'isomorphic-ws';
import WsStore from './util/WsStore';
Expand Down Expand Up @@ -31,8 +31,9 @@ export declare interface WebsocketClient {
on(event: 'reconnect' | 'close', listener: () => void): this;
};

export type WsChannel = 'orderbook' | 'orderbookGrouped' | 'markets' | 'trades' | 'ticker' | 'fills' | 'orders' | string;
export interface WsTopic {
channel: string;
channel: WsChannel;
grouping?: number;
market?: string;
};
Expand Down Expand Up @@ -73,8 +74,12 @@ export class WebsocketClient extends EventEmitter {
/**
* Add topic/topics to WS subscription list
*/
public subscribe(wsTopics: WsTopic[] | WsTopic | string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
public subscribe(wsTopics: WsTopic[] | WsTopic | WsChannel[] | WsChannel) {
const mixedTopics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
const topics = mixedTopics.map(topic => {
return typeof topic === 'string' ? { channel: topic } : topic;
});

topics.forEach(topic => this.wsStore.addTopic(
this.getWsKeyForTopic(topic),
topic
Expand All @@ -84,7 +89,7 @@ export class WebsocketClient extends EventEmitter {
this.wsStore.getKeys().forEach(wsKey => {
// if connected, send subscription request
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
return this.requestSubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)]);
return this.requestSubscribeTopics(wsKey, topics);
}

// start connection process if it hasn't yet begun. Topics are automatically subscribed to on-connect
Expand All @@ -100,8 +105,12 @@ export class WebsocketClient extends EventEmitter {
/**
* Remove topic/topics from WS subscription list
*/
public unsubscribe(wsTopics: WsTopic[] | WsTopic | string[] | string) {
const topics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
public unsubscribe(wsTopics: WsTopic[] | WsTopic | WsChannel[] | WsChannel) {
const mixedTopics = Array.isArray(wsTopics) ? wsTopics : [wsTopics];
const topics = mixedTopics.map(topic => {
return typeof topic === 'string' ? { channel: topic } : topic;
});

topics.forEach(topic => this.wsStore.deleteTopic(
this.getWsKeyForTopic(topic),
topic
Expand All @@ -110,7 +119,7 @@ export class WebsocketClient extends EventEmitter {
this.wsStore.getKeys().forEach(wsKey => {
// unsubscribe request only necessary if active connection exists
if (this.wsStore.isConnectionState(wsKey, READY_STATE_CONNECTED)) {
this.requestUnsubscribeTopics(wsKey, [...this.wsStore.getTopics(wsKey)])
this.requestUnsubscribeTopics(wsKey, topics)
}
});
}
Expand Down Expand Up @@ -272,8 +281,6 @@ export class WebsocketClient extends EventEmitter {
* Send WS message to subscribe to topics.
*/
private requestSubscribeTopics(wsKey: string, topics: WsTopic[]) {
const market = '';

topics.forEach(topic => {
const wsMessage = JSON.stringify({
op: 'subscribe',
Expand Down

0 comments on commit 4ec5aa3

Please sign in to comment.