Skip to content

Commit

Permalink
Extracted CrawlConnectionEventHandlers + added unit tests. Refactored
Browse files Browse the repository at this point in the history
Lag detection.
  • Loading branch information
pieterjan84 committed Mar 6, 2024
1 parent 826101f commit 8c8497c
Show file tree
Hide file tree
Showing 19 changed files with 572 additions and 215 deletions.
18 changes: 12 additions & 6 deletions src/__tests__/crawl-queue-manager.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { AsyncCrawlQueue } from '../crawl-queue';
import { CrawlQueueManager } from '../crawl-queue-manager';
import { mock, MockProxy } from 'jest-mock-extended';
import { mock } from 'jest-mock-extended';
import { P } from 'pino';
import { CrawlState } from '../crawl-state';
import { CrawlTask } from '../crawl-task';
Expand All @@ -22,7 +22,8 @@ describe('CrawlQueueManager', () => {
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625]
nodeAddress: ['localhost', 11625],
topTier: false
});

expect(crawlQueue.push).toHaveBeenCalled();
Expand All @@ -33,12 +34,14 @@ describe('CrawlQueueManager', () => {
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625]
nodeAddress: ['localhost', 11625],
topTier: false
});
crawlQueueManager.addCrawlTask({
connectCallback: () => {},
crawlState,
nodeAddress: ['localhost', 11625]
nodeAddress: ['localhost', 11625],
topTier: false
});

expect(crawlQueue.push).toHaveBeenCalledTimes(1);
Expand All @@ -65,22 +68,25 @@ describe('CrawlQueueManager', () => {
const task: CrawlTask = {
connectCallback: jest.fn(),
crawlState,
nodeAddress: ['localhost', 11625]
nodeAddress: ['localhost', 11625],
topTier: false
};

crawlQueue.initialize.mockImplementation((callback) => {
callback(task, () => {});
});

const crawlQueueManager = new CrawlQueueManager(crawlQueue, logger);
crawlQueueManager.queueLength();
expect(task.connectCallback).toHaveBeenCalled();
});

it('should complete a crawl task', function () {
const task: CrawlTask = {
connectCallback: jest.fn(),
crawlState,
nodeAddress: ['localhost', 11625]
nodeAddress: ['localhost', 11625],
topTier: false
};

crawlQueue.initialize.mockImplementation((callback) => {
Expand Down
23 changes: 19 additions & 4 deletions src/__tests__/peer-node-collection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ describe('PeerNodeCollection', () => {
ledgerVersion: 2,
versionString: 'versionString'
};
const connectionTime = new Date();
const peerNode = peerNodeCollection.addSuccessfullyConnected(
publicKey,
ip,
port,
nodeInfo
nodeInfo,
connectionTime
);
expect(peerNode).toBeInstanceOf(PeerNode);
if (peerNode instanceof Error) {
Expand All @@ -36,6 +38,7 @@ describe('PeerNodeCollection', () => {
expect(peerNode.ip).toBe(ip);
expect(peerNode.port).toBe(port);
expect(peerNode.nodeInfo).toBe(nodeInfo);
expect(peerNode.connectionTime).toEqual(connectionTime);
});

it('should return an error if the peer node already exists and has already successfully connected', () => {
Expand All @@ -53,13 +56,15 @@ describe('PeerNodeCollection', () => {
publicKey,
ip,
port,
nodeInfo
nodeInfo,
new Date()
);
const peerNode = peerNodeCollection.addSuccessfullyConnected(
publicKey,
ip,
port,
nodeInfo
nodeInfo,
new Date()
);
expect(peerNode).toBeInstanceOf(Error);
});
Expand All @@ -69,6 +74,7 @@ describe('PeerNodeCollection', () => {
peerNodeCollection.getOrAdd(publicKey);
const newIp = 'newIp';
const newPort = 11626;
const connectionTime = new Date();
const newNodeInfo: NodeInfo = {
overlayVersion: 4,
overlayMinVersion: 2,
Expand All @@ -80,9 +86,18 @@ describe('PeerNodeCollection', () => {
publicKey,
newIp,
newPort,
newNodeInfo
newNodeInfo,
connectionTime
);
expect(peerNode).toBeInstanceOf(PeerNode);
if (peerNode instanceof Error) {
throw peerNode;
}
expect(peerNode.publicKey).toBe(publicKey);
expect(peerNode.ip).toBe(newIp);
expect(peerNode.port).toBe(newPort);
expect(peerNode.nodeInfo).toBe(newNodeInfo);
expect(peerNode.connectionTime).toEqual(connectionTime);
});

it('should return an existing peer node', () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { PeerNodeCollection } from '../../peer-node-collection';
import { mock } from 'jest-mock-extended';
import { ConnectedPayload, ConnectionManager } from '../../connection-manager';
import { CrawlQueueManager } from '../../crawl-queue-manager';
import { DisconnectTimeout } from '../../disconnect-timeout';
import { OnConnectedHandler } from '../on-connected-handler';

describe('OnConnectedHandler', () => {
const connectionManager = mock<ConnectionManager>();
const crawlQueueManager = mock<CrawlQueueManager>();
const disconnectTimeout = mock<DisconnectTimeout>();

beforeEach(() => {
jest.clearAllMocks();
});

it('should handle a successful connection', () => {
const onConnectedHandler = new OnConnectedHandler(
connectionManager,
crawlQueueManager,
disconnectTimeout
);
const data: ConnectedPayload = {
ip: 'localhost',
port: 11625,
publicKey: 'publicKey',
nodeInfo: {
overlayVersion: 3,
overlayMinVersion: 1,
networkId: 'networkId',
ledgerVersion: 2,
versionString: 'versionString'
}
};
const peerNodes = mock<PeerNodeCollection>();
const topTierNodes: Set<string> = new Set();
const listenTimeouts = new Map();
const localTime = new Date();
onConnectedHandler.onConnected(
data,
peerNodes,
topTierNodes,
listenTimeouts,
localTime
);

expect(peerNodes.addSuccessfullyConnected).toHaveBeenCalledWith(
data.publicKey,
data.ip,
data.port,
data.nodeInfo,
localTime
);
expect(disconnectTimeout.start).toHaveBeenCalled();
});

it('should handle a peer node error', () => {
const onConnectedHandler = new OnConnectedHandler(
connectionManager,
crawlQueueManager,
disconnectTimeout
);
const data: ConnectedPayload = {
ip: 'localhost',
port: 11625,
publicKey: 'publicKey',
nodeInfo: {
overlayVersion: 3,
overlayMinVersion: 1,
networkId: 'networkId',
ledgerVersion: 2,
versionString: 'versionString'
}
};
const peerNodes = mock<PeerNodeCollection>();
const topTierNodes: Set<string> = new Set();
const listenTimeouts = new Map();
const localTime = new Date();
const error = new Error('error');
peerNodes.addSuccessfullyConnected.mockReturnValue(error);
onConnectedHandler.onConnected(
data,
peerNodes,
topTierNodes,
listenTimeouts,
localTime
);

expect(peerNodes.addSuccessfullyConnected).toHaveBeenCalledWith(
data.publicKey,
data.ip,
data.port,
data.nodeInfo,
localTime
);
expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
`${data.ip}:${data.port}`,
error
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { mock } from 'jest-mock-extended';
import { ConnectionManager } from '../../connection-manager';
import { CrawlQueueManager } from '../../crawl-queue-manager';
import { DisconnectTimeout } from '../../disconnect-timeout';
import { OnConnectionCloseHandler } from '../on-connection-close-handler';
import { CrawlState } from '../../crawl-state';
import { QuorumSetManager } from '../../quorum-set-manager';
import { PeerNodeCollection } from '../../peer-node-collection';

describe('OnConnectionCloseHandler', () => {
const queueManager = mock<CrawlQueueManager>();
const quorumSetManager = mock<QuorumSetManager>();

beforeEach(() => {
jest.clearAllMocks();
});

it('should cleanup a closed connection', () => {
const onConnectionCloseHandler = new OnConnectionCloseHandler(
quorumSetManager,
queueManager
);
const address = 'localhost:11625';
const publicKey: string = 'publicKey';
const crawlState = mock<CrawlState>();
crawlState.listenTimeouts = new Map();
const spy = jest.spyOn(crawlState.listenTimeouts, 'delete');
crawlState.peerNodes = new PeerNodeCollection();
const peer = crawlState.peerNodes.addSuccessfullyConnected(
publicKey,
'localhost',
11625,
{
overlayVersion: 3,
overlayMinVersion: 1,
networkId: 'networkId',
ledgerVersion: 2,
versionString: 'versionString'
},
new Date()
);
if (peer instanceof Error) {
throw peer;
}
const localTime = new Date();

onConnectionCloseHandler.onConnectionClose(
address,
publicKey,
crawlState,
localTime
);

expect(quorumSetManager.onNodeDisconnected).toHaveBeenCalledWith(
publicKey,
crawlState
);
expect(queueManager.completeCrawlQueueTask).toHaveBeenCalledWith(
crawlState.crawlQueueTaskDoneCallbacks,
address
);
expect(spy).toHaveBeenCalledWith(publicKey);

expect(peer.disconnected).toBe(true);
expect(peer.disconnectionTime).toBe(localTime);
});

it('should update failed connections', () => {
const onConnectionCloseHandler = new OnConnectionCloseHandler(
quorumSetManager,
queueManager
);
const address = 'localhost:11625';
const crawlState = mock<CrawlState>();
crawlState.failedConnections = [];
onConnectionCloseHandler.onConnectionClose(
address,
undefined,
crawlState,
new Date()
);
expect(crawlState.failedConnections).toEqual([address]);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { OnDataHandler } from '../on-data-handler';
import { ConnectionManager, DataPayload } from '../../connection-manager';
import { StellarMessageHandler } from '../../stellar-message-handlers/stellar-message-handler';
import { mock } from 'jest-mock-extended';
import { P } from 'pino';
import { CrawlState } from '../../crawl-state';
import { createDummyExternalizeMessage } from '../../__fixtures__/createDummyExternalizeMessage';
import { err, ok } from 'neverthrow';

describe('OnDataHandler', () => {
const connectionManager = mock<ConnectionManager>();
const stellarMessageHandler = mock<StellarMessageHandler>();
const logger = mock<P.Logger>();

beforeEach(() => {
jest.clearAllMocks();
});

it('should handle data', () => {
const onDataHandler = new OnDataHandler(
connectionManager,
stellarMessageHandler,
logger
);
const data: DataPayload = {
publicKey: 'publicKey',
stellarMessageWork: {
stellarMessage: createDummyExternalizeMessage(),
done: jest.fn()
},
address: 'address'
};
const crawlState = mock<CrawlState>();

stellarMessageHandler.handleStellarMessage.mockReturnValue(ok(undefined));
onDataHandler.onData(data, crawlState);

expect(stellarMessageHandler.handleStellarMessage).toHaveBeenCalledWith(
data.publicKey,
data.stellarMessageWork.stellarMessage,
crawlState
);
expect(data.stellarMessageWork.done).toHaveBeenCalled();
});

it('should handle data error', () => {
const onDataHandler = new OnDataHandler(
connectionManager,
stellarMessageHandler,
logger
);
const data: DataPayload = {
publicKey: 'publicKey',
stellarMessageWork: {
stellarMessage: createDummyExternalizeMessage(),
done: jest.fn()
},
address: 'address'
};
const crawlState = mock<CrawlState>();

stellarMessageHandler.handleStellarMessage.mockReturnValue(
err(new Error('error'))
);
onDataHandler.onData(data, crawlState);

expect(stellarMessageHandler.handleStellarMessage).toHaveBeenCalledWith(
data.publicKey,
data.stellarMessageWork.stellarMessage,
crawlState
);
expect(data.stellarMessageWork.done).toHaveBeenCalled();
expect(logger.info).toHaveBeenCalledWith({ peer: data.publicKey }, 'error');
expect(connectionManager.disconnectByAddress).toHaveBeenCalledWith(
data.address,
new Error('error')
);
});
});
Loading

0 comments on commit 8c8497c

Please sign in to comment.