From f2d5ff5e043e5fa2ee8de11e5b1a11815fb61525 Mon Sep 17 00:00:00 2001 From: John Hilliard Date: Wed, 9 Aug 2023 18:50:48 -0400 Subject: [PATCH] feat: adding pending tx graph --- cmd/monitor/monitor.go | 103 ++++++++++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 22 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 33be4ab3..f3ad0bd7 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -33,16 +33,22 @@ var ( intervalStr string interval time.Duration - one = big.NewInt(1) - zero = big.NewInt(0) + one = big.NewInt(1) + zero = big.NewInt(0) + selectedBlock rpctypes.PolyBlock + + currentlyFetchingHistoryLock sync.RWMutex + + observedPendingTxs historicalRange ) type ( monitorStatus struct { - ChainID *big.Int - HeadBlock *big.Int - PeerCount uint64 - GasPrice *big.Int + ChainID *big.Int + HeadBlock *big.Int + PeerCount uint64 + GasPrice *big.Int + PendingCount uint Blocks map[string]rpctypes.PolyBlock `json:"-"` BlocksLock sync.RWMutex `json:"-"` @@ -50,12 +56,18 @@ type ( MinBlockRetrieved *big.Int } chainState struct { - HeadBlock uint64 - ChainID *big.Int - PeerCount uint64 - GasPrice *big.Int + HeadBlock uint64 + ChainID *big.Int + PeerCount uint64 + GasPrice *big.Int + PendingCount uint + } + historicalDataPoint struct { + SampleTime time.Time + SampleValue float64 } - uiSkeleton struct { + historicalRange []historicalDataPoint + uiSkeleton struct { h0 *widgets.Paragraph h1 *widgets.Paragraph h2 *widgets.Paragraph @@ -101,11 +113,23 @@ func getChainState(ctx context.Context, ec *ethclient.Client) (*chainState, erro if err != nil { return nil, fmt.Errorf("couldn't estimate gas: %s", err.Error()) } + cs.PendingCount, err = ec.PendingTransactionCount(ctx) + if err != nil { + log.Debug().Err(err).Msg("Unable to get pending transaction count") + cs.PendingCount = 0 + } return cs, nil } +func (h historicalRange) getValues() []float64 { + values := make([]float64, len(h)) + for idx, v := range h { + values[idx] = v.SampleValue + } + return values +} func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) { from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1))) // Prevent getBlockRange from fetching duplicate blocks. @@ -129,11 +153,16 @@ func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Cli } } -func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) { +func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) error { if ms.MinBlockRetrieved == nil { log.Warn().Msg("Nil min block") - return + return fmt.Errorf("the min block is nil") + } + if !currentlyFetchingHistoryLock.TryLock() { + return fmt.Errorf("the function is currently locked") } + defer currentlyFetchingHistoryLock.Unlock() + to := new(big.Int).Sub(ms.MinBlockRetrieved, one) from := new(big.Int).Sub(to, big.NewInt(int64(batchSize-1))) if from.Cmp(zero) < 0 { @@ -149,7 +178,9 @@ func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Clien err := ms.getBlockRange(ctx, from, to, rpc) if err != nil { log.Error().Err(err).Msg("There was an issue fetching the block range") + return err } + return nil } func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) { @@ -160,6 +191,7 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r time.Sleep(interval) return err } + observedPendingTxs = append(observedPendingTxs, historicalDataPoint{SampleTime: time.Now(), SampleValue: float64(cs.PendingCount)}) log.Debug().Uint64("PeerCount", cs.PeerCount).Uint64("ChainID", cs.ChainID.Uint64()).Uint64("HeadBlock", cs.HeadBlock).Uint64("GasPrice", cs.GasPrice.Uint64()).Msg("fetching blocks") @@ -174,13 +206,36 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r ms.ChainID = cs.ChainID ms.PeerCount = cs.PeerCount ms.GasPrice = cs.GasPrice + ms.PendingCount = cs.PendingCount prependLatestBlocks(ctx, ms, rpc) - appendOlderBlocks(ctx, ms, rpc) + if shouldLoadMoreHistory(ctx, ms) { + appendOlderBlocks(ctx, ms, rpc) + } return } +// shouldLoadMoreHistory is meant to decide if we should keep fetching more block history. The idea is that if the user +// hasn't scrolled within a batch size of the minimum of the page, we won't keep loading more history +func shouldLoadMoreHistory(ctx context.Context, ms *monitorStatus) bool { + if ms.MinBlockRetrieved == nil { + return false + } + if selectedBlock == nil { + return false + } + minBlockNumber := ms.MinBlockRetrieved.Int64() + selectedBlockNumber := selectedBlock.Number().Int64() + if minBlockNumber == 0 { + return false + } + if minBlockNumber < selectedBlockNumber-(5*int64(batchSize)) { + return false + } + return true +} + // monitorCmd represents the monitor command var MonitorCmd = &cobra.Command{ Use: "monitor url", @@ -232,6 +287,8 @@ var MonitorCmd = &cobra.Command{ ms.Blocks = make(map[string]rpctypes.PolyBlock, 0) ms.BlocksLock.Unlock() ms.ChainID = big.NewInt(0) + ms.PendingCount = 0 + observedPendingTxs = make(historicalRange, 0) isUiRendered := false errChan := make(chan error) @@ -318,7 +375,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri termUi.h1.Title = "Gas Price" termUi.h2 = widgets.NewParagraph() - termUi.h2.Title = "Current Peers" + termUi.h2.Title = "Current" termUi.h3 = widgets.NewParagraph() termUi.h3.Title = "Chain ID" @@ -344,7 +401,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri termUi.sl3 = widgets.NewSparkline() termUi.sl3.LineColor = ui.ColorBlue slg3 := widgets.NewSparklineGroup(termUi.sl3) - slg3.Title = "Uncles" + slg3.Title = "Pending Tx" termUi.sl4 = widgets.NewSparkline() termUi.sl4.LineColor = ui.ColorMagenta @@ -429,7 +486,6 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu grid.SetRect(0, 0, termWidth, termHeight) blockGrid.SetRect(0, 0, termWidth, termHeight) - var selectedBlock rpctypes.PolyBlock var setBlock = false var allBlocks metrics.SortableBlocks var renderedBlocks metrics.SortableBlocks @@ -464,14 +520,15 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu termUi.h0.Text = fmt.Sprintf("Height: %s\nTime: %s", ms.HeadBlock.String(), time.Now().Format("02 Jan 06 15:04:05 MST")) gasGwei := new(big.Int).Div(ms.GasPrice, metrics.UnitShannon) termUi.h1.Text = fmt.Sprintf("%s gwei", gasGwei.String()) - termUi.h2.Text = fmt.Sprintf("%d", ms.PeerCount) + termUi.h2.Text = fmt.Sprintf("%d Peers\n%d Pending Tx", ms.PeerCount, ms.PendingCount) termUi.h3.Text = ms.ChainID.String() termUi.h4.Text = fmt.Sprintf("%0.2f", metrics.GetMeanBlockTime(renderedBlocks)) termUi.sl0.Data = metrics.GetTxsPerBlock(renderedBlocks) termUi.sl1.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks) termUi.sl2.Data = metrics.GetSizePerBlock(renderedBlocks) - termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks) + // termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks) + termUi.sl3.Data = observedPendingTxs.getValues() termUi.sl4.Data = metrics.GetGasPerBlock(renderedBlocks) // If a row has not been selected, continue to update the list with new blocks. @@ -487,6 +544,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu if setBlock { selectedBlock = renderedBlocks[len(renderedBlocks)-blockTable.SelectedRow] setBlock = false + log.Info().Uint64("blockNumber", selectedBlock.Number().Uint64()).Msg("Selected block changed") } } @@ -555,10 +613,11 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu if currIdx > windowSize-1 { if windowOffset+windowSize < len(allBlocks) { windowOffset += 1 - } else if windowOffset+len(renderedBlocks) == len(allBlocks) { - break } else { - appendOlderBlocks(ctx, ms, rpc) + err := appendOlderBlocks(ctx, ms, rpc) + if err != nil { + log.Warn().Err(err).Msg("unable to append more history") + } forceRedraw = true redraw(ms, true) break