Skip to content

Commit

Permalink
Merge pull request #111 from maticnetwork/jhilliard/monitor-panic-fix
Browse files Browse the repository at this point in the history
ENR Parser and Monitor Fixes
  • Loading branch information
praetoriansentry authored Aug 9, 2023
2 parents 067ac04 + 40074a5 commit 3dfe6bd
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 25 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge

- [polycli dumpblocks](doc/polycli_dumpblocks.md) - Export a range of blocks from a JSON-RPC endpoint.

- [polycli enr](doc/polycli_enr.md) - Convert between ENR and Enode format

- [polycli forge](doc/polycli_forge.md) - Forge dumped blocks on top of a genesis file.

- [polycli fork](doc/polycli_fork.md) - Take a forked block and walk up the chain to do analysis.
Expand Down
94 changes: 94 additions & 0 deletions cmd/enr/enr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package enr

import (
_ "embed"
"encoding/json"
"fmt"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"io"
"os"
"strings"

"github.com/ethereum/go-ethereum/p2p/enode"
)

var (
//go:embed usage.md
usage string
inputFileName *string
)

var ENRCmd = &cobra.Command{
Use: "enr [flags]",
Short: "Convert between ENR and Enode format",
Long: usage,
RunE: func(cmd *cobra.Command, args []string) error {
rawData, err := getInputData(cmd, args)
if err != nil {
log.Error().Err(err).Msg("Unable to read input")
return err
}
lines := strings.Split(string(rawData), "\n")

for _, l := range lines {
var node *enode.Node
var err error
l = strings.TrimSpace(l)
if l == "" {
continue
}
isENR := false
if strings.HasPrefix(l, "enr:") {
isENR = true
node, err = enode.Parse(enode.V4ID{}, l)
if err != nil {
log.Error().Err(err).Str("line", l).Msg("Unable to parse enr record")
continue
}
} else {
node, err = enode.ParseV4(l)
if err != nil {
log.Error().Err(err).Str("line", l).Msg("Unable to parse node record")
continue
}
}
genericNode := make(map[string]string, 0)
if isENR {
genericNode["enr"] = node.String()
}
genericNode["enode"] = node.URLv4()
genericNode["id"] = node.ID().String()
genericNode["ip"] = node.IP().String()
genericNode["tcp"] = fmt.Sprintf("%d", node.TCP())
genericNode["udp"] = fmt.Sprintf("%d", node.UDP())
jsonOut, err := json.Marshal(genericNode)
if err != nil {
log.Error().Err(err).Msg("unable to convert node to json")
continue
}
fmt.Println(string(jsonOut))
}
return nil
},
Args: func(cmd *cobra.Command, args []string) error {
return nil
},
}

func init() {
flagSet := ENRCmd.PersistentFlags()
inputFileName = flagSet.String("file", "", "Provide a file that's holding ENRs")
}
func getInputData(cmd *cobra.Command, args []string) ([]byte, error) {
if inputFileName != nil && *inputFileName != "" {
return os.ReadFile(*inputFileName)
}

if len(args) >= 1 {
concat := strings.Join(args, "\n")
return []byte(concat), nil
}

return io.ReadAll(os.Stdin)
}
37 changes: 37 additions & 0 deletions cmd/enr/usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
This function is meant to help handle ENR data. Given an input ENR it will output the parsed enode and other values that are part of the payload.

The command below will take an ENR and process it:
```bash
echo 'enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8' | \
polycli enr | jq '.'
```

This is the output:
```json
{
"enode": "enode://ca634cae0d49acb401d8a4c6b6fe8c55b70d115bf400769cc1400f3258cd31387574077f301b421bc84df7266c44e9e6d569fc56be00812904767bf5ccd1fc7f@127.0.0.1:0?discport=30303",
"enr": "enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8",
"id": "a448f24c6d18e575453db13171562b71999873db5b286df957af199ec94617f7",
"ip": "127.0.0.1",
"tcp": "0",
"udp": "30303"
}
```

This command can be used a few different ways
```bash
enr_data="enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8"

# First form - reading from stdin
echo "$enr_data" | polycli enr

# Second form - reading from file
tmp_file="$(mktemp)"
echo "$enr_data" > "$tmp_file"
polycli enr --file "$tmp_file"

# Third form - command line args
polycli enr "$enr_data"
```

All three forms support multiple lines. Each line will be convert into a JSON object and printed.
115 changes: 93 additions & 22 deletions cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,29 +33,41 @@ var (
intervalStr string
interval time.Duration

one = big.NewInt(1)
zero = big.NewInt(0)
one = big.NewInt(1)
zero = big.NewInt(0)
selectedBlock rpctypes.PolyBlock

currentlyFetchingHistoryLock sync.RWMutex

observedPendingTxs historicalRange
)

type (
monitorStatus struct {
ChainID *big.Int
HeadBlock *big.Int
PeerCount uint64
GasPrice *big.Int
ChainID *big.Int
HeadBlock *big.Int
PeerCount uint64
GasPrice *big.Int
PendingCount uint

Blocks map[string]rpctypes.PolyBlock `json:"-"`
BlocksLock sync.RWMutex `json:"-"`
MaxBlockRetrieved *big.Int
MinBlockRetrieved *big.Int
}
chainState struct {
HeadBlock uint64
ChainID *big.Int
PeerCount uint64
GasPrice *big.Int
HeadBlock uint64
ChainID *big.Int
PeerCount uint64
GasPrice *big.Int
PendingCount uint
}
historicalDataPoint struct {
SampleTime time.Time
SampleValue float64
}
uiSkeleton struct {
historicalRange []historicalDataPoint
uiSkeleton struct {
h0 *widgets.Paragraph
h1 *widgets.Paragraph
h2 *widgets.Paragraph
Expand Down Expand Up @@ -101,11 +113,23 @@ func getChainState(ctx context.Context, ec *ethclient.Client) (*chainState, erro
if err != nil {
return nil, fmt.Errorf("couldn't estimate gas: %s", err.Error())
}
cs.PendingCount, err = ec.PendingTransactionCount(ctx)
if err != nil {
log.Debug().Err(err).Msg("Unable to get pending transaction count")
cs.PendingCount = 0
}

return cs, nil

}

func (h historicalRange) getValues() []float64 {
values := make([]float64, len(h))
for idx, v := range h {
values[idx] = v.SampleValue
}
return values
}
func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) {
from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1)))
// Prevent getBlockRange from fetching duplicate blocks.
Expand All @@ -129,7 +153,16 @@ func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Cli
}
}

func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) {
func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) error {
if ms.MinBlockRetrieved == nil {
log.Warn().Msg("Nil min block")
return fmt.Errorf("the min block is nil")
}
if !currentlyFetchingHistoryLock.TryLock() {
return fmt.Errorf("the function is currently locked")
}
defer currentlyFetchingHistoryLock.Unlock()

to := new(big.Int).Sub(ms.MinBlockRetrieved, one)
from := new(big.Int).Sub(to, big.NewInt(int64(batchSize-1)))
if from.Cmp(zero) < 0 {
Expand All @@ -145,7 +178,9 @@ func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Clien
err := ms.getBlockRange(ctx, from, to, rpc)
if err != nil {
log.Error().Err(err).Msg("There was an issue fetching the block range")
return err
}
return nil
}

func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) {
Expand All @@ -156,6 +191,9 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r
time.Sleep(interval)
return err
}
observedPendingTxs = append(observedPendingTxs, historicalDataPoint{SampleTime: time.Now(), SampleValue: float64(cs.PendingCount)})

log.Debug().Uint64("PeerCount", cs.PeerCount).Uint64("ChainID", cs.ChainID.Uint64()).Uint64("HeadBlock", cs.HeadBlock).Uint64("GasPrice", cs.GasPrice.Uint64()).Msg("fetching blocks")

if isUiRendered && batchSize < 0 {
_, termHeight := ui.TerminalDimensions()
Expand All @@ -168,13 +206,39 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r
ms.ChainID = cs.ChainID
ms.PeerCount = cs.PeerCount
ms.GasPrice = cs.GasPrice
ms.PendingCount = cs.PendingCount

prependLatestBlocks(ctx, ms, rpc)
appendOlderBlocks(ctx, ms, rpc)
if shouldLoadMoreHistory(ctx, ms) {
err = appendOlderBlocks(ctx, ms, rpc)
if err != nil {
log.Warn().Err(err).Msg("unable to append more history")
}
}

return
}

// shouldLoadMoreHistory is meant to decide if we should keep fetching more block history. The idea is that if the user
// hasn't scrolled within a batch size of the minimum of the page, we won't keep loading more history
func shouldLoadMoreHistory(ctx context.Context, ms *monitorStatus) bool {
if ms.MinBlockRetrieved == nil {
return false
}
if selectedBlock == nil {
return false
}
minBlockNumber := ms.MinBlockRetrieved.Int64()
selectedBlockNumber := selectedBlock.Number().Int64()
if minBlockNumber == 0 {
return false
}
if minBlockNumber < selectedBlockNumber-(5*int64(batchSize)) {
return false
}
return true
}

// monitorCmd represents the monitor command
var MonitorCmd = &cobra.Command{
Use: "monitor url",
Expand Down Expand Up @@ -226,6 +290,8 @@ var MonitorCmd = &cobra.Command{
ms.Blocks = make(map[string]rpctypes.PolyBlock, 0)
ms.BlocksLock.Unlock()
ms.ChainID = big.NewInt(0)
ms.PendingCount = 0
observedPendingTxs = make(historicalRange, 0)

isUiRendered := false
errChan := make(chan error)
Expand Down Expand Up @@ -312,7 +378,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri
termUi.h1.Title = "Gas Price"

termUi.h2 = widgets.NewParagraph()
termUi.h2.Title = "Current Peers"
termUi.h2.Title = "Current"

termUi.h3 = widgets.NewParagraph()
termUi.h3.Title = "Chain ID"
Expand All @@ -338,7 +404,7 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri
termUi.sl3 = widgets.NewSparkline()
termUi.sl3.LineColor = ui.ColorBlue
slg3 := widgets.NewSparklineGroup(termUi.sl3)
slg3.Title = "Uncles"
slg3.Title = "Pending Tx"

termUi.sl4 = widgets.NewSparkline()
termUi.sl4.LineColor = ui.ColorMagenta
Expand Down Expand Up @@ -423,7 +489,6 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
grid.SetRect(0, 0, termWidth, termHeight)
blockGrid.SetRect(0, 0, termWidth, termHeight)

var selectedBlock rpctypes.PolyBlock
var setBlock = false
var allBlocks metrics.SortableBlocks
var renderedBlocks metrics.SortableBlocks
Expand Down Expand Up @@ -458,14 +523,15 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
termUi.h0.Text = fmt.Sprintf("Height: %s\nTime: %s", ms.HeadBlock.String(), time.Now().Format("02 Jan 06 15:04:05 MST"))
gasGwei := new(big.Int).Div(ms.GasPrice, metrics.UnitShannon)
termUi.h1.Text = fmt.Sprintf("%s gwei", gasGwei.String())
termUi.h2.Text = fmt.Sprintf("%d", ms.PeerCount)
termUi.h2.Text = fmt.Sprintf("%d Peers\n%d Pending Tx", ms.PeerCount, ms.PendingCount)
termUi.h3.Text = ms.ChainID.String()
termUi.h4.Text = fmt.Sprintf("%0.2f", metrics.GetMeanBlockTime(renderedBlocks))

termUi.sl0.Data = metrics.GetTxsPerBlock(renderedBlocks)
termUi.sl1.Data = metrics.GetMeanGasPricePerBlock(renderedBlocks)
termUi.sl2.Data = metrics.GetSizePerBlock(renderedBlocks)
termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks)
// termUi.sl3.Data = metrics.GetUnclesPerBlock(renderedBlocks)
termUi.sl3.Data = observedPendingTxs.getValues()
termUi.sl4.Data = metrics.GetGasPerBlock(renderedBlocks)

// If a row has not been selected, continue to update the list with new blocks.
Expand All @@ -481,6 +547,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
if setBlock {
selectedBlock = renderedBlocks[len(renderedBlocks)-blockTable.SelectedRow]
setBlock = false
log.Info().Uint64("blockNumber", selectedBlock.Number().Uint64()).Msg("Selected block changed")
}
}

Expand Down Expand Up @@ -549,10 +616,11 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
if currIdx > windowSize-1 {
if windowOffset+windowSize < len(allBlocks) {
windowOffset += 1
} else if windowOffset+len(renderedBlocks) == len(allBlocks) {
break
} else {
appendOlderBlocks(ctx, ms, rpc)
err := appendOlderBlocks(ctx, ms, rpc)
if err != nil {
log.Warn().Err(err).Msg("unable to append more history")
}
forceRedraw = true
redraw(ms, true)
break
Expand Down Expand Up @@ -601,7 +669,10 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu
windowOffset += windowSize
// good to go to next page but not enough blocks to fill page
if windowOffset > len(allBlocks)-windowSize {
appendOlderBlocks(ctx, ms, rpc)
err := appendOlderBlocks(ctx, ms, rpc)
if err != nil {
log.Warn().Err(err).Msg("unable to append more history")
}
forceRedraw = true
redraw(ms, true)
}
Expand Down
Loading

0 comments on commit 3dfe6bd

Please sign in to comment.