Skip to content

Commit

Permalink
feat: adding pending tx graph
Browse files Browse the repository at this point in the history
  • Loading branch information
praetoriansentry committed Aug 9, 2023
1 parent fcca917 commit f2d5ff5
Showing 1 changed file with 81 additions and 22 deletions.
103 changes: 81 additions & 22 deletions cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,41 @@ var (
intervalStr string
interval time.Duration

one = big.NewInt(1)
zero = big.NewInt(0)
one = big.NewInt(1)
zero = big.NewInt(0)
selectedBlock rpctypes.PolyBlock

currentlyFetchingHistoryLock sync.RWMutex

observedPendingTxs historicalRange
)

type (
monitorStatus struct {
ChainID *big.Int
HeadBlock *big.Int
PeerCount uint64
GasPrice *big.Int
ChainID *big.Int
HeadBlock *big.Int
PeerCount uint64
GasPrice *big.Int
PendingCount uint

Blocks map[string]rpctypes.PolyBlock `json:"-"`
BlocksLock sync.RWMutex `json:"-"`
MaxBlockRetrieved *big.Int
MinBlockRetrieved *big.Int
}
chainState struct {
HeadBlock uint64
ChainID *big.Int
PeerCount uint64
GasPrice *big.Int
HeadBlock uint64
ChainID *big.Int
PeerCount uint64
GasPrice *big.Int
PendingCount uint
}
historicalDataPoint struct {
SampleTime time.Time
SampleValue float64
}
uiSkeleton struct {
historicalRange []historicalDataPoint
uiSkeleton struct {
h0 *widgets.Paragraph
h1 *widgets.Paragraph
h2 *widgets.Paragraph
Expand Down Expand Up @@ -101,11 +113,23 @@ func getChainState(ctx context.Context, ec *ethclient.Client) (*chainState, erro
if err != nil {
return nil, fmt.Errorf("couldn't estimate gas: %s", err.Error())
}
cs.PendingCount, err = ec.PendingTransactionCount(ctx)
if err != nil {
log.Debug().Err(err).Msg("Unable to get pending transaction count")
cs.PendingCount = 0
}

return cs, nil

}

func (h historicalRange) getValues() []float64 {
values := make([]float64, len(h))
for idx, v := range h {
values[idx] = v.SampleValue
}
return values
}
func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) {
from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1)))
// Prevent getBlockRange from fetching duplicate blocks.
Expand All @@ -129,11 +153,16 @@ func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Cli
}
}

func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) {
func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) error {
if ms.MinBlockRetrieved == nil {
log.Warn().Msg("Nil min block")
return
return fmt.Errorf("the min block is nil")
}
if !currentlyFetchingHistoryLock.TryLock() {
return fmt.Errorf("the function is currently locked")
}
defer currentlyFetchingHistoryLock.Unlock()

to := new(big.Int).Sub(ms.MinBlockRetrieved, one)
from := new(big.Int).Sub(to, big.NewInt(int64(batchSize-1)))
if from.Cmp(zero) < 0 {
Expand All @@ -149,7 +178,9 @@ func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Clien
err := ms.getBlockRange(ctx, from, to, rpc)
if err != nil {
log.Error().Err(err).Msg("There was an issue fetching the block range")
return err
}
return nil
}

func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) {
Expand All @@ -160,6 +191,7 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r
time.Sleep(interval)
return err
}
observedPendingTxs = append(observedPendingTxs, historicalDataPoint{SampleTime: time.Now(), SampleValue: float64(cs.PendingCount)})

log.Debug().Uint64("PeerCount", cs.PeerCount).Uint64("ChainID", cs.ChainID.Uint64()).Uint64("HeadBlock", cs.HeadBlock).Uint64("GasPrice", cs.GasPrice.Uint64()).Msg("fetching blocks")

Expand All @@ -174,13 +206,36 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r
ms.ChainID = cs.ChainID
ms.PeerCount = cs.PeerCount
ms.GasPrice = cs.GasPrice
ms.PendingCount = cs.PendingCount

prependLatestBlocks(ctx, ms, rpc)
appendOlderBlocks(ctx, ms, rpc)
if shouldLoadMoreHistory(ctx, ms) {
appendOlderBlocks(ctx, ms, rpc)
}

return
}

// shouldLoadMoreHistory is meant to decide if we should keep fetching more block history. The idea is that if the user
// hasn't scrolled within a batch size of the minimum of the page, we won't keep loading more history
func shouldLoadMoreHistory(ctx context.Context, ms *monitorStatus) bool {
if ms.MinBlockRetrieved == nil {
return false
}
if selectedBlock == nil {
return false
}
minBlockNumber := ms.MinBlockRetrieved.Int64()
selectedBlockNumber := selectedBlock.Number().Int64()
if minBlockNumber == 0 {
return false
}
if minBlockNumber < selectedBlockNumber-(5*int64(batchSize)) {
return false
}
return true
}

// monitorCmd represents the monitor command
var MonitorCmd = &cobra.Command{
Use: "monitor url",
Expand Down Expand Up @@ -232,6 +287,8 @@ var MonitorCmd = &cobra.Command{
ms.Blocks = make(map[string]rpctypes.PolyBlock, 0)
ms.BlocksLock.Unlock()
ms.ChainID = big.NewInt(0)
ms.PendingCount = 0
observedPendingTxs = make(historicalRange, 0)

isUiRendered := false
errChan := make(chan error)
Expand Down Expand Up @@ -318,7 +375,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri
termUi.h1.Title = "Gas Price"

termUi.h2 = widgets.NewParagraph()
termUi.h2.Title = "Current Peers"
termUi.h2.Title = "Current"

termUi.h3 = widgets.NewParagraph()
termUi.h3.Title = "Chain ID"
Expand All @@ -344,7 +401,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri
termUi.sl3 = widgets.NewSparkline()
termUi.sl3.LineColor = ui.ColorBlue
slg3 := widgets.NewSparklineGroup(termUi.sl3)
slg3.Title = "Uncles"
slg3.Title = "Pending Tx"

termUi.sl4 = widgets.NewSparkline()
termUi.sl4.LineColor = ui.ColorMagenta
Expand Down Expand Up @@ -429,7 +486,6 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
grid.SetRect(0, 0, termWidth, termHeight)
blockGrid.SetRect(0, 0, termWidth, termHeight)

var selectedBlock rpctypes.PolyBlock
var setBlock = false
var allBlocks metrics.SortableBlocks
var renderedBlocks metrics.SortableBlocks
Expand Down Expand Up @@ -464,14 +520,15 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
termUi.h0.Text = fmt.Sprintf("Height: %s\nTime: %s", ms.HeadBlock.String(), time.Now().Format("02 Jan 06 15:04:05 MST"))
gasGwei := new(big.Int).Div(ms.GasPrice, metrics.UnitShannon)
termUi.h1.Text = fmt.Sprintf("%s gwei", gasGwei.String())
termUi.h2.Text = fmt.Sprintf("%d", ms.PeerCount)
termUi.h2.Text = fmt.Sprintf("%d Peers\n%d Pending Tx", ms.PeerCount, ms.PendingCount)
termUi.h3.Text = ms.ChainID.String()
termUi.h4.Text = fmt.Sprintf("%0.2f", metrics.GetMeanBlockTime(renderedBlocks))

termUi.sl0.Data = metrics.GetTxsPerBlock(renderedBlocks)
termUi.sl1.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks)
termUi.sl2.Data = metrics.GetSizePerBlock(renderedBlocks)
termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks)
// termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks)
termUi.sl3.Data = observedPendingTxs.getValues()
termUi.sl4.Data = metrics.GetGasPerBlock(renderedBlocks)

// If a row has not been selected, continue to update the list with new blocks.
Expand All @@ -487,6 +544,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
if setBlock {
selectedBlock = renderedBlocks[len(renderedBlocks)-blockTable.SelectedRow]
setBlock = false
log.Info().Uint64("blockNumber", selectedBlock.Number().Uint64()).Msg("Selected block changed")
}
}

Expand Down Expand Up @@ -555,10 +613,11 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
if currIdx > windowSize-1 {
if windowOffset+windowSize < len(allBlocks) {
windowOffset += 1
} else if windowOffset+len(renderedBlocks) == len(allBlocks) {
break
} else {
appendOlderBlocks(ctx, ms, rpc)
err := appendOlderBlocks(ctx, ms, rpc)
if err != nil {
log.Warn().Err(err).Msg("unable to append more history")
}
forceRedraw = true
redraw(ms, true)
break
Expand Down

0 comments on commit f2d5ff5

Please sign in to comment.