Skip to content

Commit

Permalink
fix: chunking logic refactored and extracted to utils, chainHeight ta…
Browse files Browse the repository at this point in the history
…sk refactored to process chunks
  • Loading branch information
Baha committed Dec 27, 2021
1 parent 0e7f106 commit 7ff0257
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"REQUEST_TIMEOUT": 5000,
"NUMBER_OF_NODE_REQUEST_CHUNK": 10,
"NODE_PEERS_REQUEST_CHUNK_SIZE": 50,
"CHAIN_HEIGHT_REQUEST_CHUNK_SIZE": 10,
"PREFERRED_NODES": ["*.symboldev.network"],
"MIN_PARTNER_NODE_VERSION": 16777728
}
5 changes: 5 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ interface Monitor {
NODE_MONITOR_SCHEDULE_INTERVAL: number;
NUMBER_OF_NODE_REQUEST_CHUNK: number;
NODE_PEERS_REQUEST_CHUNK_SIZE: number;
CHAIN_HEIGHT_REQUEST_CHUNK_SIZE: number;
CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL: number;
GEOLOCATION_MONITOR_SCHEDULE_INTERVAL: number;
API_NODE_PORT: number;
Expand Down Expand Up @@ -51,6 +52,7 @@ export const monitor: Monitor = {
NODE_MONITOR_SCHEDULE_INTERVAL: Number(process.env.NODE_MONITOR_SCHEDULE_INTERVAL) || config.NODE_MONITOR_SCHEDULE_INTERVAL,
NUMBER_OF_NODE_REQUEST_CHUNK: Number(process.env.NUMBER_OF_NODE_REQUEST_CHUNK) || config.NUMBER_OF_NODE_REQUEST_CHUNK,
NODE_PEERS_REQUEST_CHUNK_SIZE: Number(process.env.NODE_PEERS_REQUEST_CHUNK_SIZE) || config.NODE_PEERS_REQUEST_CHUNK_SIZE,
CHAIN_HEIGHT_REQUEST_CHUNK_SIZE: Number(process.env.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE) || config.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE,
CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL:
Number(process.env.CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL) || config.CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL,
GEOLOCATION_MONITOR_SCHEDULE_INTERVAL:
Expand Down Expand Up @@ -90,6 +92,9 @@ export const verifyConfig = (cfg: Config): boolean => {
if (isNaN(cfg.monitor.NODE_PEERS_REQUEST_CHUNK_SIZE) || cfg.monitor.NODE_PEERS_REQUEST_CHUNK_SIZE < 0)
error = 'Invalid "NODE_PEERS_REQUEST_CHUNK_SIZE"';

if (isNaN(cfg.monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE) || cfg.monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE < 0)
error = 'Invalid "CHAIN_HEIGHT_REQUEST_CHUNK_SIZE"';

if (isNaN(cfg.monitor.API_NODE_PORT) || cfg.monitor.API_NODE_PORT <= 0 || cfg.monitor.API_NODE_PORT >= 10000)
error = 'Invalid "API_NODE_PORT"';

Expand Down
50 changes: 25 additions & 25 deletions src/services/ChainHeightMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Logger } from '@src/infrastructure';

import { INode } from '@src/models/Node';
import { INodeHeightStats } from '@src/models/NodeHeightStats';
import { symbol } from '@src/config';
import { isAPIRole, basename, sleep } from '@src/utils';
import { symbol, monitor } from '@src/config';
import { isAPIRole, basename, sleep, runTaskInChunks } from '@src/utils';

const logger: winston.Logger = Logger.getLogger(basename(__filename));

Expand Down Expand Up @@ -80,32 +80,32 @@ export class ChainHeightMonitor {

private getNodeChainHeight = async () => {
logger.info(`Getting height stats for ${this.nodeList.length} nodes`);
const nodes: INode[] = this.nodeList;
const nodeChainInfoPromises = nodes.map((node) => {
const isHttps = node.apiStatus?.isHttpsEnabled;
const protocol = isHttps ? 'https:' : 'http:';
const port = isHttps ? 3001 : 3000;

const hostUrl = `${protocol}//${node.host}:${port}`;

return ApiNodeService.getNodeChainInfo(hostUrl);
});
const nodeChainInfoList = await Promise.all(nodeChainInfoPromises);

for (let chainInfo of nodeChainInfoList) {
try {
if (chainInfo) {
if (this.heights[chainInfo.height]) this.heights[chainInfo.height]++;
else this.heights[chainInfo.height] = 1;

if (this.finalizedHeights[chainInfo.latestFinalizedBlock.height])
this.finalizedHeights[chainInfo.latestFinalizedBlock.height]++;
else this.finalizedHeights[chainInfo.latestFinalizedBlock.height] = 1;
await runTaskInChunks(this.nodeList, monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE, logger, 'getNodeChainHeight', async (nodes) => {
const nodeChainInfoPromises = nodes.map((node) => {
const isHttps = node.apiStatus?.isHttpsEnabled;
const protocol = isHttps ? 'https:' : 'http:';
const port = isHttps ? 3001 : 3000;

const hostUrl = `${protocol}//${node.host}:${port}`;

return ApiNodeService.getNodeChainInfo(hostUrl);
});
const nodeChainInfoList = await Promise.all(nodeChainInfoPromises);

for (let chainInfo of nodeChainInfoList) {
try {
if (chainInfo) {
this.heights[chainInfo.height] = (this.heights[chainInfo.height] || 0) + 1;
this.finalizedHeights[chainInfo.latestFinalizedBlock.height] =
(this.finalizedHeights[chainInfo.latestFinalizedBlock.height] || 0) + 1;
}
} catch (e) {
logger.error(`Node chain height monitor failed. ${e.message}`);
}
} catch (e) {
logger.error(`Node chain height monitor failed. ${e.message}`);
}
}
return nodeChainInfoList;
});
};

private clear = () => {
Expand Down
43 changes: 10 additions & 33 deletions src/services/NodeMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { Logger } from '@src/infrastructure';

import { INode, validateNodeModel } from '@src/models/Node';
import { symbol, monitor } from '@src/config';
import { isAPIRole, isPeerRole, basename, splitArray, showDuration } from '@src/utils';
import { isAPIRole, isPeerRole, basename, showDuration, runTaskInChunks } from '@src/utils';

const logger: winston.Logger = Logger.getLogger(basename(__filename));

Expand Down Expand Up @@ -139,53 +139,32 @@ export class NodeMonitor {
logger.info(
`[fetchAndAddNodeListPeers] Getting peers from nodes, total nodes: ${this.nodeList.length}, api nodes: ${apiNodeList.length}`,
);
const nodeListChunks = splitArray(apiNodeList, this.nodePeersChunkSize);

let numOfNodesProcessed = 0;

for (const nodes of nodeListChunks) {
logger.info(
`[fetchAndAddNodeListPeers] Getting peer list for chunk of ${nodes.length} nodes, progress: ${
numOfNodesProcessed + '/' + apiNodeList.length
}`,
);
const nodePeersPromises = [...nodes].map(async (node) =>
this.fetchNodePeersByURL(await ApiNodeService.buildHostUrl(node.host)),
await runTaskInChunks(this.nodeList, this.nodePeersChunkSize, logger, 'fetchAndAddNodeListPeers', async (nodes) => {
const arrayOfPeerList = await Promise.all(
[...nodes].map(async (node) => this.fetchNodePeersByURL(await ApiNodeService.buildHostUrl(node.host))),
);
const arrayOfPeerList = await Promise.all(nodePeersPromises);
const peers: INode[] = arrayOfPeerList.reduce((accumulator, value) => accumulator.concat(value), []);

this.addNodesToList(peers);
numOfNodesProcessed += nodes.length;
}
return peers;
});
};

private getNodeListInfo = async () => {
const startTime = new Date().getTime();
const nodeCount = this.nodeList.length;

logger.info(`[getNodeListInfo] Getting node from peers, total nodes: ${nodeCount}`);
const nodeListChunks = splitArray(this.nodeList, this.nodeInfoChunks);

this.nodeList = [];

let numOfNodesProcessed = 0;

for (const nodes of nodeListChunks) {
logger.info(
`[getNodeListInfo] Getting node info for chunk of ${nodes.length} nodes, progress: ${
numOfNodesProcessed + '/' + nodeCount
}`,
);
await runTaskInChunks(this.nodeList, this.nodeInfoChunks, logger, 'getNodeListInfo', async (nodes) => {
const nodeInfoPromises = [...nodes].map((node) => this.getNodeInfo(node));
const arrayOfNodeInfo = await Promise.all(nodeInfoPromises);

logger.info(`[getNodeListInfo] Number of nodeInfo:${arrayOfNodeInfo.length} in the chunk of ${nodes.length}`);
this.addNodesToList(arrayOfNodeInfo);
numOfNodesProcessed += nodes.length;
return arrayOfNodeInfo;
});

//await sleep(this.nodeInfoDelay);
}
this.nodeList.forEach((node) => this.nodesStats.addToStats(node));
logger.info(
`[getNodeListInfo] Total node count(after nodeInfo): ${this.nodeList.length}, time elapsed: [${showDuration(
Expand Down Expand Up @@ -233,9 +212,7 @@ export class NodeMonitor {
} catch (e) {
logger.error(`[getNodeInfo] Failed to fetch info for "${nodeWithInfo.host}". ${e.message}`);
}
logger.info(
`[getNodeInfo] NodeHost values before:${nodeHost} and after:${nodeWithInfo.host} and hostDetail.host:${nodeWithInfo.hostDetail?.host}`,
);

return nodeWithInfo;
}

Expand Down
32 changes: 32 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { INode } from '@src/models/Node';
import * as path from 'path';
import * as humanizeDuration from 'humanize-duration';
import winston = require('winston');

export const stringToArray = (str: string | undefined): Array<any> => {
let result = null;
Expand Down Expand Up @@ -63,3 +64,34 @@ export const splitArray = (array: Array<any>, chunks: number): Array<any> =>
export const showDuration = (durationMs: number): string => {
return humanizeDuration(durationMs);
};

export const runTaskInChunks = async <T>(
list: T[],
chunkSize: number,
logger: winston.Logger,
loggingMethod: string,
asyncTask: (subList: T[]) => Promise<any[]>,
) => {
const chunks: T[][] = splitArray(list, chunkSize);

logger.info(
`[${loggingMethod}] Running the task for chunks, Total Size: ${list.length}, Chunk size: ${chunkSize}, Chunk count: ${Math.ceil(
list.length / chunkSize,
)}`,
);

let numOfNodesProcessed = 0,
i = 0;

for (const chunk of chunks) {
logger.info(
`[${loggingMethod}] Working on chunk #${++i}/${chunks.length}, size: ${chunk.length}, progress: ${numOfNodesProcessed}/${
list.length
}`,
);
const arrayOfTaskResults = await asyncTask(chunk);

logger.info(`[${loggingMethod}] Number of results:${arrayOfTaskResults.length} in the chunk of ${chunk.length}`);
numOfNodesProcessed += chunk.length;
}
};

0 comments on commit 7ff0257

Please sign in to comment.