From 822d658030c5be8cced21538d8e14468d4ac2f65 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Wed, 10 Jan 2024 10:26:57 -0500 Subject: [PATCH] concurrent processing of get blocks. batches by 50 default (#181) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * concurrent processing of get blocks. batches by 50 default * fix error * lock on error value * leo comments * add lock * block info * Update cmd/monitor/monitor.go leo changes Co-authored-by: Léo Vincent <28714795+leovct@users.noreply.github.com> * final changes to monitor * fix: https://github.com/maticnetwork/polygon-cli/pull/181#pullrequestreview-1808576970 --------- Co-authored-by: Léo Vincent <28714795+leovct@users.noreply.github.com> Co-authored-by: John Hilliard --- cmd/monitor/monitor.go | 106 ++++++++++++++++++++++++++++-------- cmd/monitor/ui/ui.go | 118 ++++++++++++++++++++++++++++++++--------- 2 files changed, 178 insertions(+), 46 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 942687cb..3b8c700a 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -27,13 +27,34 @@ import ( var errBatchRequestsNotSupported = errors.New("batch requests are not supported") var ( - windowSize int - batchSize SafeBatchSize - interval time.Duration - one = big.NewInt(1) - zero = big.NewInt(0) + // windowSize determines the number of blocks to display in the monitor UI at one time. + windowSize int + + // batchSize holds the number of blocks to fetch in one batch. + // It can be adjusted dynamically based on network conditions. + batchSize SafeBatchSize + + // interval specifies the time duration to wait between each update cycle. + interval time.Duration + + // one and zero are big.Int representations of 1 and 0, used for convenience in calculations. + one = big.NewInt(1) + zero = big.NewInt(0) + + // observedPendingTxs holds a historical record of the number of pending transactions. observedPendingTxs historicalRange - maxDataPoints = 1000 + + // maxDataPoints defines the maximum number of data points to keep in historical records. + maxDataPoints = 1000 + + // maxConcurrency defines the maximum number of goroutines that can fetch block data concurrently. + maxConcurrency = 10 + + // semaphore is a channel used to control the concurrency of block data fetch operations. + semaphore = make(chan struct{}, maxConcurrency) + + // size of the sub batches to divide and conquer the total batch size with + subBatchSize = 50 ) type ( @@ -277,26 +298,64 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et return nil } - b := backoff.NewExponentialBackOff() - b.MaxElapsedTime = 3 * time.Minute - retryable := func() error { - return rpc.BatchCallContext(ctx, blms) - } - if err := backoff.Retry(retryable, b); err != nil { + err := ms.processBatchesConcurrently(ctx, rpc, blms) + if err != nil { + log.Error().Err(err).Msg("Error processing batches concurrently") return err } - ms.BlocksLock.Lock() - defer ms.BlocksLock.Unlock() - for _, b := range blms { - if b.Error != nil { - continue - } - pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse)) - ms.BlockCache.Add(pb.Number().String(), pb) + return nil +} + +func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error { + var wg sync.WaitGroup + var errs []error = make([]error, 0) + var errorsMutex sync.Mutex + + for i := 0; i < len(blms); i += subBatchSize { + semaphore <- struct{}{} + wg.Add(1) + go func(i int) { + defer func() { + <-semaphore + wg.Done() + }() + end := i + subBatchSize + if end > len(blms) { + end = len(blms) + } + subBatch := blms[i:end] + + b := backoff.NewExponentialBackOff() + b.MaxElapsedTime = 3 * time.Minute + retryable := func() error { + return rpc.BatchCallContext(ctx, subBatch) + } + if err := backoff.Retry(retryable, b); err != nil { + log.Error().Err(err).Msg("unable to retry") + errorsMutex.Lock() + errs = append(errs, err) + errorsMutex.Unlock() + return + } + + for _, elem := range subBatch { + if elem.Error != nil { + log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element") + } else { + pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse)) + ms.BlocksLock.Lock() + ms.BlockCache.Add(pb.Number().String(), pb) + ms.BlocksLock.Unlock() + } + } + + }(i) } - return nil + wg.Wait() + + return errors.Join(errs...) } func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { @@ -315,6 +374,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu grid.SetRect(0, 0, termWidth, termHeight) blockGrid.SetRect(0, 0, termWidth, termHeight) transactionGrid.SetRect(0, 0, termWidth, termHeight) + // Initial render needed I assume to avoid the first bad redraw + termui.Render(grid) var setBlock = false var renderedBlocks rpctypes.SortableBlocks @@ -401,7 +462,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu ms.BlocksLock.RUnlock() renderedBlocks = renderedBlocksTemp - skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks) + log.Warn().Int("skeleton.Current.Inner.Dy()", skeleton.Current.Inner.Dy()).Int("skeleton.Current.Inner.Dx()", skeleton.Current.Inner.Dx()).Msg("the dimension of the current box") + skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks, skeleton.Current.Inner.Dx(), skeleton.Current.Inner.Dy()) skeleton.TxPerBlockChart.Data = metrics.GetTxsPerBlock(renderedBlocks) skeleton.GasPriceChart.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks) skeleton.BlockSizeChart.Data = metrics.GetSizePerBlock(renderedBlocks) diff --git a/cmd/monitor/ui/ui.go b/cmd/monitor/ui/ui.go index 499e5381..cbab0c6a 100644 --- a/cmd/monitor/ui/ui.go +++ b/cmd/monitor/ui/ui.go @@ -31,15 +31,64 @@ type UiSkeleton struct { Receipts *widgets.List } -func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock) string { +func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock, dx int, dy int) string { + // Return an appropriate message if dy is 0 or less. + if dy <= 0 { + return "Invalid display configuration." + } + height := fmt.Sprintf("Height: %s", headBlock.String()) - time := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST")) + timeInfo := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST")) gasPriceString := fmt.Sprintf("Gas Price: %s gwei", new(big.Int).Div(gasPrice, metrics.UnitShannon).String()) peers := fmt.Sprintf("Peers: %d", peerCount) pendingTx := fmt.Sprintf("Pending Tx: %d", pendingCount) chainIdString := fmt.Sprintf("Chain ID: %s", chainID.String()) - return fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", height, time, gasPriceString, peers, pendingTx, chainIdString) + info := []string{height, timeInfo, gasPriceString, peers, pendingTx, chainIdString} + columns := len(info) / dy + if len(info)%dy != 0 { + columns += 1 // Add an extra column for the remaining items + } + + // Calculate the width of each column based on the longest string in each column + columnWidths := make([]int, columns) + for i := 0; i < columns; i++ { + for j := 0; j < dy; j++ { + index := i*dy + j + if index < len(info) && len(info[index]) > columnWidths[i] { + columnWidths[i] = len(info[index]) + } + } + // Add padding and ensure it doesn't exceed 'dx' + columnWidths[i] += 5 // Adjust padding as needed + if columnWidths[i] > dx { + columnWidths[i] = dx + } + } + + var formattedInfo strings.Builder + for i := 0; i < dy; i++ { + for j := 0; j < columns; j++ { + index := j*dy + i + if index < len(info) { + formatString := fmt.Sprintf("%%-%ds", columnWidths[j]) + formattedInfo.WriteString(fmt.Sprintf(formatString, info[index])) + } + } + formattedInfo.WriteString("\n") + } + + return formattedInfo.String() +} + +func max(nums ...int) int { + maxNum := nums[0] + for _, num := range nums[1:] { + if num > maxNum { + maxNum = num + } + } + return maxNum } func GetBlocksList(blocks []rpctypes.PolyBlock) ([]string, string) { @@ -127,7 +176,6 @@ func GetSimpleBlockFields(block rpctypes.PolyBlock) []string { ut := time.Unix(int64(ts), 0) author := "Mined by" - authorAddress := block.Miner().String() if authorAddress == "0x0000000000000000000000000000000000000000" { author = "Signed by" @@ -135,29 +183,50 @@ func GetSimpleBlockFields(block rpctypes.PolyBlock) []string { if err == nil { authorAddress = hex.EncodeToString(signer) } - } - return []string{ - "", - fmt.Sprintf("Block Height: %s", block.Number()), - fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339)), - fmt.Sprintf("Transactions: %d", len(block.Transactions())), - fmt.Sprintf("%s: %s", author, authorAddress), - fmt.Sprintf("Difficulty: %s", block.Difficulty()), - fmt.Sprintf("Size: %d", block.Size()), - fmt.Sprintf("Uncles: %d", len(block.Uncles())), - fmt.Sprintf("Gas used: %d", block.GasUsed()), - fmt.Sprintf("Gas limit: %d", block.GasLimit()), - fmt.Sprintf("Base Fee per gas: %s", block.BaseFee()), - fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra())), - fmt.Sprintf("Hash: %s", block.Hash()), - fmt.Sprintf("Parent Hash: %s", block.ParentHash()), - fmt.Sprintf("Uncle Hash: %s", block.UncleHash()), - fmt.Sprintf("State Root: %s", block.Root()), - fmt.Sprintf("Tx Hash: %s", block.TxHash()), - fmt.Sprintf("Nonce: %d", block.Nonce()), + blockHeight := fmt.Sprintf("Block Height: %s", block.Number()) + timestamp := fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339)) + transactions := fmt.Sprintf("Transactions: %d", len(block.Transactions())) + authorInfo := fmt.Sprintf("%s: %s", author, authorAddress) + difficulty := fmt.Sprintf("Difficulty: %s", block.Difficulty()) + size := fmt.Sprintf("Size: %d", block.Size()) + uncles := fmt.Sprintf("Uncles: %d", len(block.Uncles())) + gasUsed := fmt.Sprintf("Gas used: %d", block.GasUsed()) + gasLimit := fmt.Sprintf("Gas limit: %d", block.GasLimit()) + baseFee := fmt.Sprintf("Base Fee per gas: %s", block.BaseFee()) + extraData := fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra())) + hash := fmt.Sprintf("Hash: %s", block.Hash()) + parentHash := fmt.Sprintf("Parent Hash: %s", block.ParentHash()) + uncleHash := fmt.Sprintf("Uncle Hash: %s", block.UncleHash()) + stateRoot := fmt.Sprintf("State Root: %s", block.Root()) + txHash := fmt.Sprintf("Tx Hash: %s", block.TxHash()) + nonce := fmt.Sprintf("Nonce: %d", block.Nonce()) + + maxWidthCol1 := max(len(blockHeight), len(transactions), len(difficulty), len(size), len(gasUsed), len(baseFee), len(hash), len(stateRoot)) + + blockHeight = fmt.Sprintf("%-*s", maxWidthCol1, blockHeight) + transactions = fmt.Sprintf("%-*s", maxWidthCol1, transactions) + difficulty = fmt.Sprintf("%-*s", maxWidthCol1, difficulty) + size = fmt.Sprintf("%-*s", maxWidthCol1, size) + gasUsed = fmt.Sprintf("%-*s", maxWidthCol1, gasUsed) + baseFee = fmt.Sprintf("%-*s", maxWidthCol1, baseFee) + hash = fmt.Sprintf("%-*s", maxWidthCol1, hash) + stateRoot = fmt.Sprintf("%-*s", maxWidthCol1, stateRoot) + + lines := []string{ + fmt.Sprintf("%s %s", blockHeight, timestamp), + fmt.Sprintf("%s %s", transactions, authorInfo), + fmt.Sprintf("%s %s", difficulty, uncles), + fmt.Sprintf("%s %s", size, gasLimit), + fmt.Sprintf("%s %s", gasUsed, extraData), + fmt.Sprintf("%s %s", baseFee, parentHash), + fmt.Sprintf("%s %s", hash, uncleHash), + fmt.Sprintf("%s %s", stateRoot, txHash), + nonce, } + + return lines } func GetBlockTxTable(block rpctypes.PolyBlock, chainID *big.Int) [][]string { @@ -323,6 +392,7 @@ func SetUISkeleton() (blockList *widgets.List, blockInfo *widgets.List, transact blockInfo = widgets.NewList() blockInfo.TextStyle = ui.NewStyle(ui.ColorWhite) blockInfo.Title = "Block Information" + blockInfo.WrapText = true transactionInfo = widgets.NewTable() transactionInfo.TextStyle = ui.NewStyle(ui.ColorWhite)