From 7ff025737d14a173ec4835fc61c4c01ef62c8ee1 Mon Sep 17 00:00:00 2001 From: Baha Date: Mon, 27 Dec 2021 09:49:32 +0000 Subject: [PATCH] fix: chunking logic refactored and extracted to utils, chainHeight task refactored to process chunks --- src/config/config.json | 1 + src/config/index.ts | 5 +++ src/services/ChainHeightMonitor.ts | 50 +++++++++++++++--------------- src/services/NodeMonitor.ts | 43 ++++++------------------- src/utils.ts | 32 +++++++++++++++++++ 5 files changed, 73 insertions(+), 58 deletions(-) diff --git a/src/config/config.json b/src/config/config.json index be448ae..06cf14f 100644 --- a/src/config/config.json +++ b/src/config/config.json @@ -18,6 +18,7 @@ "REQUEST_TIMEOUT": 5000, "NUMBER_OF_NODE_REQUEST_CHUNK": 10, "NODE_PEERS_REQUEST_CHUNK_SIZE": 50, + "CHAIN_HEIGHT_REQUEST_CHUNK_SIZE": 10, "PREFERRED_NODES": ["*.symboldev.network"], "MIN_PARTNER_NODE_VERSION": 16777728 } diff --git a/src/config/index.ts b/src/config/index.ts index 233456e..013dd4d 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -19,6 +19,7 @@ interface Monitor { NODE_MONITOR_SCHEDULE_INTERVAL: number; NUMBER_OF_NODE_REQUEST_CHUNK: number; NODE_PEERS_REQUEST_CHUNK_SIZE: number; + CHAIN_HEIGHT_REQUEST_CHUNK_SIZE: number; CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL: number; GEOLOCATION_MONITOR_SCHEDULE_INTERVAL: number; API_NODE_PORT: number; @@ -51,6 +52,7 @@ export const monitor: Monitor = { NODE_MONITOR_SCHEDULE_INTERVAL: Number(process.env.NODE_MONITOR_SCHEDULE_INTERVAL) || config.NODE_MONITOR_SCHEDULE_INTERVAL, NUMBER_OF_NODE_REQUEST_CHUNK: Number(process.env.NUMBER_OF_NODE_REQUEST_CHUNK) || config.NUMBER_OF_NODE_REQUEST_CHUNK, NODE_PEERS_REQUEST_CHUNK_SIZE: Number(process.env.NODE_PEERS_REQUEST_CHUNK_SIZE) || config.NODE_PEERS_REQUEST_CHUNK_SIZE, + CHAIN_HEIGHT_REQUEST_CHUNK_SIZE: Number(process.env.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE) || config.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE, CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL: Number(process.env.CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL) || config.CHAIN_HEIGHT_MONITOR_SCHEDULE_INTERVAL, GEOLOCATION_MONITOR_SCHEDULE_INTERVAL: @@ -90,6 +92,9 @@ export const verifyConfig = (cfg: Config): boolean => { if (isNaN(cfg.monitor.NODE_PEERS_REQUEST_CHUNK_SIZE) || cfg.monitor.NODE_PEERS_REQUEST_CHUNK_SIZE < 0) error = 'Invalid "NODE_PEERS_REQUEST_CHUNK_SIZE"'; + if (isNaN(cfg.monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE) || cfg.monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE < 0) + error = 'Invalid "CHAIN_HEIGHT_REQUEST_CHUNK_SIZE"'; + if (isNaN(cfg.monitor.API_NODE_PORT) || cfg.monitor.API_NODE_PORT <= 0 || cfg.monitor.API_NODE_PORT >= 10000) error = 'Invalid "API_NODE_PORT"'; diff --git a/src/services/ChainHeightMonitor.ts b/src/services/ChainHeightMonitor.ts index 57ac996..19ba5db 100644 --- a/src/services/ChainHeightMonitor.ts +++ b/src/services/ChainHeightMonitor.ts @@ -5,8 +5,8 @@ import { Logger } from '@src/infrastructure'; import { INode } from '@src/models/Node'; import { INodeHeightStats } from '@src/models/NodeHeightStats'; -import { symbol } from '@src/config'; -import { isAPIRole, basename, sleep } from '@src/utils'; +import { symbol, monitor } from '@src/config'; +import { isAPIRole, basename, sleep, runTaskInChunks } from '@src/utils'; const logger: winston.Logger = Logger.getLogger(basename(__filename)); @@ -80,32 +80,32 @@ export class ChainHeightMonitor { private getNodeChainHeight = async () => { logger.info(`Getting height stats for ${this.nodeList.length} nodes`); - const nodes: INode[] = this.nodeList; - const nodeChainInfoPromises = nodes.map((node) => { - const isHttps = node.apiStatus?.isHttpsEnabled; - const protocol = isHttps ? 'https:' : 'http:'; - const port = isHttps ? 3001 : 3000; - const hostUrl = `${protocol}//${node.host}:${port}`; - - return ApiNodeService.getNodeChainInfo(hostUrl); - }); - const nodeChainInfoList = await Promise.all(nodeChainInfoPromises); - - for (let chainInfo of nodeChainInfoList) { - try { - if (chainInfo) { - if (this.heights[chainInfo.height]) this.heights[chainInfo.height]++; - else this.heights[chainInfo.height] = 1; - - if (this.finalizedHeights[chainInfo.latestFinalizedBlock.height]) - this.finalizedHeights[chainInfo.latestFinalizedBlock.height]++; - else this.finalizedHeights[chainInfo.latestFinalizedBlock.height] = 1; + await runTaskInChunks(this.nodeList, monitor.CHAIN_HEIGHT_REQUEST_CHUNK_SIZE, logger, 'getNodeChainHeight', async (nodes) => { + const nodeChainInfoPromises = nodes.map((node) => { + const isHttps = node.apiStatus?.isHttpsEnabled; + const protocol = isHttps ? 'https:' : 'http:'; + const port = isHttps ? 3001 : 3000; + + const hostUrl = `${protocol}//${node.host}:${port}`; + + return ApiNodeService.getNodeChainInfo(hostUrl); + }); + const nodeChainInfoList = await Promise.all(nodeChainInfoPromises); + + for (let chainInfo of nodeChainInfoList) { + try { + if (chainInfo) { + this.heights[chainInfo.height] = (this.heights[chainInfo.height] || 0) + 1; + this.finalizedHeights[chainInfo.latestFinalizedBlock.height] = + (this.finalizedHeights[chainInfo.latestFinalizedBlock.height] || 0) + 1; + } + } catch (e) { + logger.error(`Node chain height monitor failed. ${e.message}`); } - } catch (e) { - logger.error(`Node chain height monitor failed. ${e.message}`); } - } + return nodeChainInfoList; + }); }; private clear = () => { diff --git a/src/services/NodeMonitor.ts b/src/services/NodeMonitor.ts index e8cb3f8..3858e64 100644 --- a/src/services/NodeMonitor.ts +++ b/src/services/NodeMonitor.ts @@ -13,7 +13,7 @@ import { Logger } from '@src/infrastructure'; import { INode, validateNodeModel } from '@src/models/Node'; import { symbol, monitor } from '@src/config'; -import { isAPIRole, isPeerRole, basename, splitArray, showDuration } from '@src/utils'; +import { isAPIRole, isPeerRole, basename, showDuration, runTaskInChunks } from '@src/utils'; const logger: winston.Logger = Logger.getLogger(basename(__filename)); @@ -139,25 +139,16 @@ export class NodeMonitor { logger.info( `[fetchAndAddNodeListPeers] Getting peers from nodes, total nodes: ${this.nodeList.length}, api nodes: ${apiNodeList.length}`, ); - const nodeListChunks = splitArray(apiNodeList, this.nodePeersChunkSize); - let numOfNodesProcessed = 0; - - for (const nodes of nodeListChunks) { - logger.info( - `[fetchAndAddNodeListPeers] Getting peer list for chunk of ${nodes.length} nodes, progress: ${ - numOfNodesProcessed + '/' + apiNodeList.length - }`, - ); - const nodePeersPromises = [...nodes].map(async (node) => - this.fetchNodePeersByURL(await ApiNodeService.buildHostUrl(node.host)), + await runTaskInChunks(this.nodeList, this.nodePeersChunkSize, logger, 'fetchAndAddNodeListPeers', async (nodes) => { + const arrayOfPeerList = await Promise.all( + [...nodes].map(async (node) => this.fetchNodePeersByURL(await ApiNodeService.buildHostUrl(node.host))), ); - const arrayOfPeerList = await Promise.all(nodePeersPromises); const peers: INode[] = arrayOfPeerList.reduce((accumulator, value) => accumulator.concat(value), []); this.addNodesToList(peers); - numOfNodesProcessed += nodes.length; - } + return peers; + }); }; private getNodeListInfo = async () => { @@ -165,27 +156,15 @@ export class NodeMonitor { const nodeCount = this.nodeList.length; logger.info(`[getNodeListInfo] Getting node from peers, total nodes: ${nodeCount}`); - const nodeListChunks = splitArray(this.nodeList, this.nodeInfoChunks); - - this.nodeList = []; - - let numOfNodesProcessed = 0; - for (const nodes of nodeListChunks) { - logger.info( - `[getNodeListInfo] Getting node info for chunk of ${nodes.length} nodes, progress: ${ - numOfNodesProcessed + '/' + nodeCount - }`, - ); + await runTaskInChunks(this.nodeList, this.nodeInfoChunks, logger, 'getNodeListInfo', async (nodes) => { const nodeInfoPromises = [...nodes].map((node) => this.getNodeInfo(node)); const arrayOfNodeInfo = await Promise.all(nodeInfoPromises); - logger.info(`[getNodeListInfo] Number of nodeInfo:${arrayOfNodeInfo.length} in the chunk of ${nodes.length}`); this.addNodesToList(arrayOfNodeInfo); - numOfNodesProcessed += nodes.length; + return arrayOfNodeInfo; + }); - //await sleep(this.nodeInfoDelay); - } this.nodeList.forEach((node) => this.nodesStats.addToStats(node)); logger.info( `[getNodeListInfo] Total node count(after nodeInfo): ${this.nodeList.length}, time elapsed: [${showDuration( @@ -233,9 +212,7 @@ export class NodeMonitor { } catch (e) { logger.error(`[getNodeInfo] Failed to fetch info for "${nodeWithInfo.host}". ${e.message}`); } - logger.info( - `[getNodeInfo] NodeHost values before:${nodeHost} and after:${nodeWithInfo.host} and hostDetail.host:${nodeWithInfo.hostDetail?.host}`, - ); + return nodeWithInfo; } diff --git a/src/utils.ts b/src/utils.ts index 63ec6c9..c8a0eb8 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,6 +1,7 @@ import { INode } from '@src/models/Node'; import * as path from 'path'; import * as humanizeDuration from 'humanize-duration'; +import winston = require('winston'); export const stringToArray = (str: string | undefined): Array => { let result = null; @@ -63,3 +64,34 @@ export const splitArray = (array: Array, chunks: number): Array => export const showDuration = (durationMs: number): string => { return humanizeDuration(durationMs); }; + +export const runTaskInChunks = async ( + list: T[], + chunkSize: number, + logger: winston.Logger, + loggingMethod: string, + asyncTask: (subList: T[]) => Promise, +) => { + const chunks: T[][] = splitArray(list, chunkSize); + + logger.info( + `[${loggingMethod}] Running the task for chunks, Total Size: ${list.length}, Chunk size: ${chunkSize}, Chunk count: ${Math.ceil( + list.length / chunkSize, + )}`, + ); + + let numOfNodesProcessed = 0, + i = 0; + + for (const chunk of chunks) { + logger.info( + `[${loggingMethod}] Working on chunk #${++i}/${chunks.length}, size: ${chunk.length}, progress: ${numOfNodesProcessed}/${ + list.length + }`, + ); + const arrayOfTaskResults = await asyncTask(chunk); + + logger.info(`[${loggingMethod}] Number of results:${arrayOfTaskResults.length} in the chunk of ${chunk.length}`); + numOfNodesProcessed += chunk.length; + } +};