Skip to content

Commit

Permalink
Merge pull request #14 from eco-stake/monitor-validator-uptime
Browse files Browse the repository at this point in the history
Block monitor
  • Loading branch information
tombeynon authored May 21, 2022
2 parents e859c97 + 327b103 commit b23529a
Show file tree
Hide file tree
Showing 15 changed files with 1,008 additions and 312 deletions.
83 changes: 83 additions & 0 deletions chains/blockMonitor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import PQueue from 'p-queue';
import _ from 'lodash'
import { debugLog, timeStamp } from '../utils.js';
import { Client } from 'rpc-websockets'
import { UniqueQueue } from '../uniqueQueue.js'

function BlockMonitor() {
const monitors = {}
const queue = new PQueue({ concurrency: 20, queueClass: UniqueQueue });

async function refreshChains(client, chains) {
timeStamp('Running block update');
[...chains].map((chain) => {
const readMessage = function ({ data: message }) {
message = JSON.parse(message);
if (message.result?.data?.type === 'tendermint/event/NewBlock') {
return setBlock(client, chain, message.result.data.value.block);
}
}

const request = async () => {
const apis = await chain.apis('rpc')
const rpcUrl = apis.bestAddress('rpc')
if (!rpcUrl) return timeStamp(chain.path, 'No API URL')

let monitor = monitors[chain.path]
if (monitor) {
const reconnecting = monitor.reconnect && (monitor.max_reconnects > monitor.current_reconnects)
if((monitor.ready && monitor.socket) || reconnecting){
return
}else{
monitor.socket?.removeEventListener("message", readMessage)
try {
monitor.close()
} catch { }
}
}

debugLog(chain.path, 'Websocket connecting')
let ws = new Client(rpcUrl.replace('http', 'ws') + 'websocket', { reconnect: true, max_reconnects: 3 })
monitors[chain.path] = ws
ws.on('open', function () {
ws.call('subscribe', { query: "tm.event='NewBlock'" })

ws.socket.addEventListener("message", readMessage)
})
};
return queue.add(request, { identifier: chain.path });
});
debugLog('Block update queued')
}

async function setBlock(client, chain, block) {
try {
const height = block.header.height
debugLog(chain.path, 'Caching height', height)
const processed = processBlock(block)
await client.json.set(`blocks:${chain.path}`, '$', processed)
await client.json.set(`blocks:${chain.path}#${height}`, '$', processed)
await client.expire(`blocks:${chain.path}#${height}`, 60 * 60)
} catch (error) {
timeStamp(chain.path, 'Block update failed', error.message)
}
}

function processBlock(block) {
const { height, time } = block.header;
const { signatures } = block.last_commit;
return {
height: parseInt(height),
time,
signatures: signatures.map(signature => {
return signature.validator_address
})
};
}

return {
refreshChains
}
}

export default BlockMonitor
Loading

0 comments on commit b23529a

Please sign in to comment.