Skip to content

Commit

Permalink
concurrent processing of get blocks. batches by 50 default (#181)
Browse files Browse the repository at this point in the history
* concurrent processing of get blocks. batches by 50 default

* fix error

* lock on error value

* leo comments

* add lock

* block info

* Update cmd/monitor/monitor.go

leo changes

Co-authored-by: Léo Vincent <[email protected]>

* final changes to monitor

* fix: #181 (review)

---------

Co-authored-by: Léo Vincent <[email protected]>
Co-authored-by: John Hilliard <[email protected]>
  • Loading branch information
3 people authored Jan 10, 2024
1 parent b69dae7 commit 822d658
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 46 deletions.
106 changes: 84 additions & 22 deletions cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,34 @@ import (
var errBatchRequestsNotSupported = errors.New("batch requests are not supported")

var (
windowSize int
batchSize SafeBatchSize
interval time.Duration
one = big.NewInt(1)
zero = big.NewInt(0)
// windowSize determines the number of blocks to display in the monitor UI at one time.
windowSize int

// batchSize holds the number of blocks to fetch in one batch.
// It can be adjusted dynamically based on network conditions.
batchSize SafeBatchSize

// interval specifies the time duration to wait between each update cycle.
interval time.Duration

// one and zero are big.Int representations of 1 and 0, used for convenience in calculations.
one = big.NewInt(1)
zero = big.NewInt(0)

// observedPendingTxs holds a historical record of the number of pending transactions.
observedPendingTxs historicalRange
maxDataPoints = 1000

// maxDataPoints defines the maximum number of data points to keep in historical records.
maxDataPoints = 1000

// maxConcurrency defines the maximum number of goroutines that can fetch block data concurrently.
maxConcurrency = 10

// semaphore is a channel used to control the concurrency of block data fetch operations.
semaphore = make(chan struct{}, maxConcurrency)

// size of the sub batches to divide and conquer the total batch size with
subBatchSize = 50
)

type (
Expand Down Expand Up @@ -277,26 +298,64 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et
return nil
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 3 * time.Minute
retryable := func() error {
return rpc.BatchCallContext(ctx, blms)
}
if err := backoff.Retry(retryable, b); err != nil {
err := ms.processBatchesConcurrently(ctx, rpc, blms)
if err != nil {
log.Error().Err(err).Msg("Error processing batches concurrently")
return err
}

ms.BlocksLock.Lock()
defer ms.BlocksLock.Unlock()
for _, b := range blms {
if b.Error != nil {
continue
}
pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse))
ms.BlockCache.Add(pb.Number().String(), pb)
return nil
}

func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error {
var wg sync.WaitGroup
var errs []error = make([]error, 0)
var errorsMutex sync.Mutex

for i := 0; i < len(blms); i += subBatchSize {
semaphore <- struct{}{}
wg.Add(1)
go func(i int) {
defer func() {
<-semaphore
wg.Done()
}()
end := i + subBatchSize
if end > len(blms) {
end = len(blms)
}
subBatch := blms[i:end]

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 3 * time.Minute
retryable := func() error {
return rpc.BatchCallContext(ctx, subBatch)
}
if err := backoff.Retry(retryable, b); err != nil {
log.Error().Err(err).Msg("unable to retry")
errorsMutex.Lock()
errs = append(errs, err)
errorsMutex.Unlock()
return
}

for _, elem := range subBatch {
if elem.Error != nil {
log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element")
} else {
pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse))
ms.BlocksLock.Lock()
ms.BlockCache.Add(pb.Number().String(), pb)
ms.BlocksLock.Unlock()
}
}

}(i)
}

return nil
wg.Wait()

return errors.Join(errs...)
}

func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error {
Expand All @@ -315,6 +374,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
grid.SetRect(0, 0, termWidth, termHeight)
blockGrid.SetRect(0, 0, termWidth, termHeight)
transactionGrid.SetRect(0, 0, termWidth, termHeight)
// Initial render needed I assume to avoid the first bad redraw
termui.Render(grid)

var setBlock = false
var renderedBlocks rpctypes.SortableBlocks
Expand Down Expand Up @@ -401,7 +462,8 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
ms.BlocksLock.RUnlock()
renderedBlocks = renderedBlocksTemp

skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks)
log.Warn().Int("skeleton.Current.Inner.Dy()", skeleton.Current.Inner.Dy()).Int("skeleton.Current.Inner.Dx()", skeleton.Current.Inner.Dx()).Msg("the dimension of the current box")
skeleton.Current.Text = ui.GetCurrentBlockInfo(ms.HeadBlock, ms.GasPrice, ms.PeerCount, ms.PendingCount, ms.ChainID, renderedBlocks, skeleton.Current.Inner.Dx(), skeleton.Current.Inner.Dy())
skeleton.TxPerBlockChart.Data = metrics.GetTxsPerBlock(renderedBlocks)
skeleton.GasPriceChart.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks)
skeleton.BlockSizeChart.Data = metrics.GetSizePerBlock(renderedBlocks)
Expand Down
118 changes: 94 additions & 24 deletions cmd/monitor/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,64 @@ type UiSkeleton struct {
Receipts *widgets.List
}

func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock) string {
func GetCurrentBlockInfo(headBlock *big.Int, gasPrice *big.Int, peerCount uint64, pendingCount uint64, chainID *big.Int, blocks []rpctypes.PolyBlock, dx int, dy int) string {
// Return an appropriate message if dy is 0 or less.
if dy <= 0 {
return "Invalid display configuration."
}

height := fmt.Sprintf("Height: %s", headBlock.String())
time := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST"))
timeInfo := fmt.Sprintf("Time: %s", time.Now().Format("02 Jan 06 15:04:05 MST"))
gasPriceString := fmt.Sprintf("Gas Price: %s gwei", new(big.Int).Div(gasPrice, metrics.UnitShannon).String())
peers := fmt.Sprintf("Peers: %d", peerCount)
pendingTx := fmt.Sprintf("Pending Tx: %d", pendingCount)
chainIdString := fmt.Sprintf("Chain ID: %s", chainID.String())
return fmt.Sprintf("%s\n%s\n%s\n%s\n%s\n%s", height, time, gasPriceString, peers, pendingTx, chainIdString)

info := []string{height, timeInfo, gasPriceString, peers, pendingTx, chainIdString}
columns := len(info) / dy
if len(info)%dy != 0 {
columns += 1 // Add an extra column for the remaining items
}

// Calculate the width of each column based on the longest string in each column
columnWidths := make([]int, columns)
for i := 0; i < columns; i++ {
for j := 0; j < dy; j++ {
index := i*dy + j
if index < len(info) && len(info[index]) > columnWidths[i] {
columnWidths[i] = len(info[index])
}
}
// Add padding and ensure it doesn't exceed 'dx'
columnWidths[i] += 5 // Adjust padding as needed
if columnWidths[i] > dx {
columnWidths[i] = dx
}
}

var formattedInfo strings.Builder
for i := 0; i < dy; i++ {
for j := 0; j < columns; j++ {
index := j*dy + i
if index < len(info) {
formatString := fmt.Sprintf("%%-%ds", columnWidths[j])
formattedInfo.WriteString(fmt.Sprintf(formatString, info[index]))
}
}
formattedInfo.WriteString("\n")
}

return formattedInfo.String()
}

func max(nums ...int) int {
maxNum := nums[0]
for _, num := range nums[1:] {
if num > maxNum {
maxNum = num
}
}
return maxNum
}

func GetBlocksList(blocks []rpctypes.PolyBlock) ([]string, string) {
Expand Down Expand Up @@ -127,37 +176,57 @@ func GetSimpleBlockFields(block rpctypes.PolyBlock) []string {
ut := time.Unix(int64(ts), 0)

author := "Mined by"

authorAddress := block.Miner().String()
if authorAddress == "0x0000000000000000000000000000000000000000" {
author = "Signed by"
signer, err := metrics.Ecrecover(&block)
if err == nil {
authorAddress = hex.EncodeToString(signer)
}

}

return []string{
"",
fmt.Sprintf("Block Height: %s", block.Number()),
fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339)),
fmt.Sprintf("Transactions: %d", len(block.Transactions())),
fmt.Sprintf("%s: %s", author, authorAddress),
fmt.Sprintf("Difficulty: %s", block.Difficulty()),
fmt.Sprintf("Size: %d", block.Size()),
fmt.Sprintf("Uncles: %d", len(block.Uncles())),
fmt.Sprintf("Gas used: %d", block.GasUsed()),
fmt.Sprintf("Gas limit: %d", block.GasLimit()),
fmt.Sprintf("Base Fee per gas: %s", block.BaseFee()),
fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra())),
fmt.Sprintf("Hash: %s", block.Hash()),
fmt.Sprintf("Parent Hash: %s", block.ParentHash()),
fmt.Sprintf("Uncle Hash: %s", block.UncleHash()),
fmt.Sprintf("State Root: %s", block.Root()),
fmt.Sprintf("Tx Hash: %s", block.TxHash()),
fmt.Sprintf("Nonce: %d", block.Nonce()),
blockHeight := fmt.Sprintf("Block Height: %s", block.Number())
timestamp := fmt.Sprintf("Timestamp: %d (%s)", ts, ut.Format(time.RFC3339))
transactions := fmt.Sprintf("Transactions: %d", len(block.Transactions()))
authorInfo := fmt.Sprintf("%s: %s", author, authorAddress)
difficulty := fmt.Sprintf("Difficulty: %s", block.Difficulty())
size := fmt.Sprintf("Size: %d", block.Size())
uncles := fmt.Sprintf("Uncles: %d", len(block.Uncles()))
gasUsed := fmt.Sprintf("Gas used: %d", block.GasUsed())
gasLimit := fmt.Sprintf("Gas limit: %d", block.GasLimit())
baseFee := fmt.Sprintf("Base Fee per gas: %s", block.BaseFee())
extraData := fmt.Sprintf("Extra data: %s", metrics.RawDataToASCII(block.Extra()))
hash := fmt.Sprintf("Hash: %s", block.Hash())
parentHash := fmt.Sprintf("Parent Hash: %s", block.ParentHash())
uncleHash := fmt.Sprintf("Uncle Hash: %s", block.UncleHash())
stateRoot := fmt.Sprintf("State Root: %s", block.Root())
txHash := fmt.Sprintf("Tx Hash: %s", block.TxHash())
nonce := fmt.Sprintf("Nonce: %d", block.Nonce())

maxWidthCol1 := max(len(blockHeight), len(transactions), len(difficulty), len(size), len(gasUsed), len(baseFee), len(hash), len(stateRoot))

blockHeight = fmt.Sprintf("%-*s", maxWidthCol1, blockHeight)
transactions = fmt.Sprintf("%-*s", maxWidthCol1, transactions)
difficulty = fmt.Sprintf("%-*s", maxWidthCol1, difficulty)
size = fmt.Sprintf("%-*s", maxWidthCol1, size)
gasUsed = fmt.Sprintf("%-*s", maxWidthCol1, gasUsed)
baseFee = fmt.Sprintf("%-*s", maxWidthCol1, baseFee)
hash = fmt.Sprintf("%-*s", maxWidthCol1, hash)
stateRoot = fmt.Sprintf("%-*s", maxWidthCol1, stateRoot)

lines := []string{
fmt.Sprintf("%s %s", blockHeight, timestamp),
fmt.Sprintf("%s %s", transactions, authorInfo),
fmt.Sprintf("%s %s", difficulty, uncles),
fmt.Sprintf("%s %s", size, gasLimit),
fmt.Sprintf("%s %s", gasUsed, extraData),
fmt.Sprintf("%s %s", baseFee, parentHash),
fmt.Sprintf("%s %s", hash, uncleHash),
fmt.Sprintf("%s %s", stateRoot, txHash),
nonce,
}

return lines
}

func GetBlockTxTable(block rpctypes.PolyBlock, chainID *big.Int) [][]string {
Expand Down Expand Up @@ -323,6 +392,7 @@ func SetUISkeleton() (blockList *widgets.List, blockInfo *widgets.List, transact
blockInfo = widgets.NewList()
blockInfo.TextStyle = ui.NewStyle(ui.ColorWhite)
blockInfo.Title = "Block Information"
blockInfo.WrapText = true

transactionInfo = widgets.NewTable()
transactionInfo.TextStyle = ui.NewStyle(ui.ColorWhite)
Expand Down

0 comments on commit 822d658

Please sign in to comment.