From f06209ebfdabd6d697832f0eb029420e5b40ec34 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Mon, 13 Nov 2023 23:10:18 -0500 Subject: [PATCH 1/3] Revert "Revert "DVT-1057 replace block data structure with LRU cache to fix memory leak (#148)"" This reverts commit f174e6399abd8930b28e7728bee9d9575f706c40. --- README.md | 3 + cmd/monitor/cmd.go | 13 +- cmd/monitor/monitor.go | 434 ++++++++++++++++++++++++----------------- doc/polycli_monitor.md | 1 + go.mod | 1 + go.sum | 4 + 6 files changed, 270 insertions(+), 186 deletions(-) diff --git a/README.md b/README.md index 886128d8..639f8a65 100644 --- a/README.md +++ b/README.md @@ -148,6 +148,9 @@ You can then generate some load to make sure that blocks with transactions are b $ polycli loadtest --verbosity 700 --chain-id 1337 --concurrency 1 --requests 1000 --rate-limit 5 --mode c --rpc-url http://127.0.0.1:8545 ``` +## Monitor Debug +`polycli monitor --rpc-url http://34.117.145.249:80 -v 700 &> log.txt` + # Contributing - If you add a new loadtest mode, don't forget to update the loadtest mode string by running the following command: `cd cmd/loadtest && stringer -type=loadTestMode`. You can install [stringer](https://pkg.go.dev/golang.org/x/tools/cmd/stringer) with `go install golang.org/x/tools/cmd/stringer@latest`. diff --git a/cmd/monitor/cmd.go b/cmd/monitor/cmd.go index 2f57cd04..990f5223 100644 --- a/cmd/monitor/cmd.go +++ b/cmd/monitor/cmd.go @@ -15,9 +15,10 @@ var ( usage string // flags - rpcUrl string - batchSizeValue string - intervalStr string + rpcUrl string + batchSizeValue string + blockCacheLimit int + intervalStr string ) // MonitorCmd represents the monitor command @@ -37,6 +38,7 @@ var MonitorCmd = &cobra.Command{ func init() { MonitorCmd.PersistentFlags().StringVarP(&rpcUrl, "rpc-url", "r", "http://localhost:8545", "The RPC endpoint url") MonitorCmd.PersistentFlags().StringVarP(&batchSizeValue, "batch-size", "b", "auto", "Number of requests per batch") + MonitorCmd.PersistentFlags().IntVarP(&blockCacheLimit, "cache-limit", "c", 100, "Number of cached blocks for the LRU block data structure (Min 100)") MonitorCmd.PersistentFlags().StringVarP(&intervalStr, "interval", "i", "5s", "Amount of time between batch block rpc calls") } @@ -66,5 +68,10 @@ func checkFlags() (err error) { } } + // Check batch-size flag. + if blockCacheLimit < 100 { + return fmt.Errorf("block-cache can't be less than 100") + } + return nil } diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index 67cf06d8..bc0f9b7d 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -4,10 +4,9 @@ import ( "context" "fmt" "math/big" - "sort" - "sync" "time" + lru "github.com/hashicorp/golang-lru" "github.com/maticnetwork/polygon-cli/util" _ "embed" @@ -24,28 +23,24 @@ import ( ) var ( - windowSize int - batchSize int - interval time.Duration - one = big.NewInt(1) - zero = big.NewInt(0) - selectedBlock rpctypes.PolyBlock - currentlyFetchingHistoryLock sync.RWMutex - observedPendingTxs historicalRange + windowSize int + batchSize int + interval time.Duration + one = big.NewInt(1) + zero = big.NewInt(0) + selectedBlock rpctypes.PolyBlock + observedPendingTxs historicalRange ) type ( monitorStatus struct { - ChainID *big.Int - HeadBlock *big.Int - PeerCount uint64 - GasPrice *big.Int - PendingCount uint64 - - Blocks map[string]rpctypes.PolyBlock `json:"-"` - BlocksLock sync.RWMutex `json:"-"` - MaxBlockRetrieved *big.Int - MinBlockRetrieved *big.Int + TopDisplayedBlock *big.Int + ChainID *big.Int + HeadBlock *big.Int + PeerCount uint64 + GasPrice *big.Int + PendingCount uint64 + BlockCache *lru.Cache } chainState struct { HeadBlock uint64 @@ -91,10 +86,8 @@ func monitor(ctx context.Context) error { ec := ethclient.NewClient(rpc) ms := new(monitorStatus) - ms.MaxBlockRetrieved = big.NewInt(0) - ms.BlocksLock.Lock() - ms.Blocks = make(map[string]rpctypes.PolyBlock, 0) - ms.BlocksLock.Unlock() + ms.BlockCache, _ = lru.New(blockCacheLimit) + ms.ChainID = big.NewInt(0) ms.PendingCount = 0 observedPendingTxs = make(historicalRange, 0) @@ -102,20 +95,27 @@ func monitor(ctx context.Context) error { isUiRendered := false errChan := make(chan error) go func() { - for { - err = fetchBlocks(ctx, ec, ms, rpc, isUiRendered) - if err != nil { - continue - } + select { + case <-ctx.Done(): // listens for a cancellation signal + return // exit the goroutine when the context is done + default: + for { + err = fetchBlocks(ctx, ec, ms, rpc, isUiRendered) + if err != nil { + continue + } + if !isUiRendered { + go func() { + if ms.TopDisplayedBlock == nil { + ms.TopDisplayedBlock = ms.HeadBlock + } + errChan <- renderMonitorUI(ctx, ec, ms, rpc) + }() + isUiRendered = true + } - if !isUiRendered { - go func() { - errChan <- renderMonitorUI(ctx, ec, ms, rpc) - }() - isUiRendered = true + time.Sleep(interval) } - - time.Sleep(interval) } }() @@ -167,12 +167,9 @@ func (h historicalRange) getValues(limit int) []float64 { } return values } + func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) { from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1))) - // Prevent getBlockRange from fetching duplicate blocks. - if ms.MaxBlockRetrieved.Cmp(from) == 1 { - from.Add(ms.MaxBlockRetrieved, big.NewInt(1)) - } if from.Cmp(zero) < 0 { from.SetInt64(0) @@ -181,7 +178,6 @@ func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Cli log.Debug(). Int64("from", from.Int64()). Int64("to", ms.HeadBlock.Int64()). - Int64("max", ms.MaxBlockRetrieved.Int64()). Msg("Fetching latest blocks") err := ms.getBlockRange(ctx, from, ms.HeadBlock, rpc) @@ -190,36 +186,6 @@ func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Cli } } -func appendOlderBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) error { - if ms.MinBlockRetrieved == nil { - log.Warn().Msg("Nil min block") - return fmt.Errorf("the min block is nil") - } - if !currentlyFetchingHistoryLock.TryLock() { - return fmt.Errorf("the function is currently locked") - } - defer currentlyFetchingHistoryLock.Unlock() - - to := new(big.Int).Sub(ms.MinBlockRetrieved, one) - from := new(big.Int).Sub(to, big.NewInt(int64(batchSize-1))) - if from.Cmp(zero) < 0 { - from.SetInt64(0) - } - - log.Debug(). - Int64("from", from.Int64()). - Int64("to", to.Int64()). - Int64("min", ms.MinBlockRetrieved.Int64()). - Msg("Fetching older blocks") - - err := ms.getBlockRange(ctx, from, to, rpc) - if err != nil { - log.Error().Err(err).Msg("There was an issue fetching the block range") - return err - } - return nil -} - func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) { var cs *chainState cs, err = getChainState(ctx, ec) @@ -246,36 +212,10 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r ms.PendingCount = cs.PendingCount prependLatestBlocks(ctx, ms, rpc) - if shouldLoadMoreHistory(ctx, ms) { - err = appendOlderBlocks(ctx, ms, rpc) - if err != nil { - log.Warn().Err(err).Msg("Unable to append more history") - } - } return } -// shouldLoadMoreHistory is meant to decide if we should keep fetching more block history. The idea is that if the user -// hasn't scrolled within a batch size of the minimum of the page, we won't keep loading more history -func shouldLoadMoreHistory(ctx context.Context, ms *monitorStatus) bool { - if ms.MinBlockRetrieved == nil { - return false - } - if selectedBlock == nil { - return false - } - minBlockNumber := ms.MinBlockRetrieved.Int64() - selectedBlockNumber := selectedBlock.Number().Int64() - if minBlockNumber == 0 { - return false - } - if minBlockNumber < selectedBlockNumber-(5*int64(batchSize)) { - return false - } - return true -} - func (ms *monitorStatus) getBlockRange(ctx context.Context, from, to *big.Int, rpc *ethrpc.Client) error { blms := make([]ethrpc.BatchElem, 0) for i := from; i.Cmp(to) != 1; i.Add(i, one) { @@ -307,16 +247,7 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, from, to *big.Int, r } pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse)) - ms.BlocksLock.Lock() - ms.Blocks[pb.Number().String()] = pb - ms.BlocksLock.Unlock() - - if ms.MaxBlockRetrieved.Cmp(pb.Number()) == -1 { - ms.MaxBlockRetrieved = pb.Number() - } - if ms.MinBlockRetrieved == nil || (ms.MinBlockRetrieved.Cmp(pb.Number()) == 1 && pb.Number().Cmp(zero) == 1) { - ms.MinBlockRetrieved = pb.Number() - } + ms.BlockCache.Add(pb.Number().String(), pb) } return nil @@ -415,21 +346,6 @@ func setUISkeleton() (blockTable *widgets.List, grid *ui.Grid, blockGrid *ui.Gri return } -func updateAllBlocks(ms *monitorStatus) []rpctypes.PolyBlock { - // default - blocks := make([]rpctypes.PolyBlock, 0) - - ms.BlocksLock.RLock() - for _, b := range ms.Blocks { - blocks = append(blocks, b) - } - ms.BlocksLock.RUnlock() - - allBlocks := metrics.SortableBlocks(blocks) - - return allBlocks -} - func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error { if err := ui.Init(); err != nil { return err @@ -446,9 +362,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu blockGrid.SetRect(0, 0, termWidth, termHeight) var setBlock = false - var allBlocks metrics.SortableBlocks var renderedBlocks metrics.SortableBlocks - windowOffset := 0 redraw := func(ms *monitorStatus, force ...bool) { log.Debug().Interface("ms", ms).Msg("Redrawing") @@ -465,16 +379,36 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu return } - if blockTable.SelectedRow == 0 || len(force) > 0 && force[0] { - allBlocks = updateAllBlocks(ms) - sort.Sort(allBlocks) + if blockTable.SelectedRow == 0 { + ms.TopDisplayedBlock = ms.HeadBlock + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) + } + + // Fetch the blocks in the new range if they are missing + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, ms.TopDisplayedBlock) + if err != nil { + log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + } } - start := len(allBlocks) - windowSize - windowOffset - if start < 0 { - start = 0 + toBlockNumber := ms.TopDisplayedBlock + fromBlockNumber := new(big.Int).Sub(toBlockNumber, big.NewInt(int64(windowSize-1))) + if fromBlockNumber.Cmp(zero) < 0 { + fromBlockNumber.SetInt64(0) // We cannot have block numbers less than 0. } - end := len(allBlocks) - windowOffset - renderedBlocks = allBlocks[start:end] + renderedBlocksTemp := make([]rpctypes.PolyBlock, 0, windowSize) + for i := new(big.Int).Set(fromBlockNumber); i.Cmp(toBlockNumber) <= 0; i.Add(i, big.NewInt(1)) { + if block, ok := ms.BlockCache.Get(i.String()); ok { + renderedBlocksTemp = append(renderedBlocksTemp, block.(rpctypes.PolyBlock)) + } else { + // If for some reason the block is not in the cache after fetching, handle this case. + log.Warn().Str("blockNumber", i.String()).Msg("Block should be in cache but is not") + } + } + renderedBlocks = renderedBlocksTemp termUi.h0.Text = fmt.Sprintf("Height: %s\nTime: %s", ms.HeadBlock.String(), time.Now().Format("02 Jan 06 15:04:05 MST")) gasGwei := new(big.Int).Div(ms.GasPrice, metrics.UnitShannon) @@ -501,6 +435,11 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu // Only changed the selected block when the user presses the up down keys. // Otherwise this will adjust when the table is updated automatically. if setBlock { + log.Debug(). + Int("blockTable.SelectedRow", blockTable.SelectedRow). + Int("renderedBlocks", len(renderedBlocks)). + Msg("setBlock") + selectedBlock = renderedBlocks[len(renderedBlocks)-blockTable.SelectedRow] setBlock = false log.Debug().Uint64("blockNumber", selectedBlock.Number().Uint64()).Msg("Selected block changed") @@ -512,11 +451,11 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu currentBn := ms.HeadBlock uiEvents := ui.PollEvents() - ticker := time.NewTicker(time.Second).C + ticker := time.NewTicker(time.Second) + defer ticker.Stop() redraw(ms) - currIdx := 0 previousKey := "" for { forceRedraw := false @@ -526,9 +465,22 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu case "q", "": return nil case "": + ms.TopDisplayedBlock = ms.HeadBlock blockTable.SelectedRow = 0 currentMode = monitorModeExplorer - windowOffset = 0 + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) + } + + // Fetch the blocks in the new range if they are missing + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, ms.TopDisplayedBlock) + if err != nil { + log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + break + } case "": if blockTable.SelectedRow > 0 { currentMode = monitorModeBlock @@ -551,95 +503,182 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu } if blockTable.SelectedRow == 0 { - currIdx = 1 - blockTable.SelectedRow = currIdx + blockTable.SelectedRow = 1 setBlock = true break } - currIdx = blockTable.SelectedRow if e.ID == "" { log.Debug(). - Int("currIdx", currIdx). + Int("blockTable.SelectedRow", blockTable.SelectedRow). Int("windowSize", windowSize). Int("renderedBlocks", len(renderedBlocks)). Int("dy", blockTable.Dy()). - Int("windowOffset", windowOffset). - Int("allBlocks", len(allBlocks)). Msg("Down") // the last row of current window size - if currIdx > windowSize-1 { - if windowOffset+windowSize < len(allBlocks) { - windowOffset += 1 - } else { - err := appendOlderBlocks(ctx, ms, rpc) + if blockTable.SelectedRow > windowSize-1 { + // Calculate the range of block numbers we are trying to page down to + nextTopBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, one) + if nextTopBlockNumber.Cmp(zero) < 0 { + // If we've gone past the earliest block, set it to the earliest block number + nextTopBlockNumber.SetInt64(0) + } + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) + } + + // Fetch the blocks if the next block is not in cache + if !isBlockInCache(ms.BlockCache, toBlockNumber) { + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize))), toBlockNumber) if err != nil { - log.Warn().Err(err).Msg("Unable to append more history") + log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + break } - forceRedraw = true - redraw(ms, true) - break } + + // Update the top displayed block number + ms.TopDisplayedBlock = nextTopBlockNumber + + blockTable.SelectedRow = len(renderedBlocks) + setBlock = true + + // Force redraw to update the UI with the new page of blocks + forceRedraw = true + redraw(ms, true) + break } - currIdx += 1 + blockTable.SelectedRow += 1 setBlock = true } else if e.ID == "" { - log.Debug().Int("currIdx", currIdx).Int("windowSize", windowSize).Msg("Up") - if currIdx <= 1 && windowOffset > 0 { - windowOffset -= 1 + log.Debug().Int("blockTable.SelectedRow", blockTable.SelectedRow).Int("windowSize", windowSize).Msg("Up") + + // the last row of current window size + if blockTable.SelectedRow == 1 { + // Calculate the range of block numbers we are trying to page down to + nextTopBlockNumber := new(big.Int).Add(ms.TopDisplayedBlock, one) + if nextTopBlockNumber.Cmp(ms.HeadBlock) > 0 { + nextTopBlockNumber.SetInt64(ms.HeadBlock.Int64()) + } + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) + } + + // Fetch the blocks in the new range if they are missing + if !isBlockInCache(ms.BlockCache, nextTopBlockNumber) { + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, new(big.Int).Add(nextTopBlockNumber, big.NewInt(int64(windowSize)))) + if err != nil { + log.Warn().Err(err).Msg("Failed to fetch blocks on page up") + break + } + } + + // Update the top displayed block number + ms.TopDisplayedBlock = nextTopBlockNumber + + blockTable.SelectedRow = 1 + setBlock = true + + // Force redraw to update the UI with the new page of blocks + forceRedraw = true + redraw(ms, true) break } - currIdx -= 1 + blockTable.SelectedRow -= 1 setBlock = true } - // need a better way to understand how many rows are visible - if currIdx > 0 && currIdx <= windowSize && currIdx <= len(renderedBlocks) { - blockTable.SelectedRow = currIdx - } case "": - windowOffset = 0 + ms.TopDisplayedBlock = ms.HeadBlock blockTable.SelectedRow = 1 setBlock = true case "g": if previousKey == "g" { - windowOffset = 0 + ms.TopDisplayedBlock = ms.HeadBlock blockTable.SelectedRow = 1 setBlock = true } case "G", "": if len(renderedBlocks) < windowSize { - windowOffset = 0 + ms.TopDisplayedBlock = ms.HeadBlock blockTable.SelectedRow = len(renderedBlocks) } else { - windowOffset = len(allBlocks) - windowSize + // windowOffset = len(allBlocks) - windowSize blockTable.SelectedRow = max(windowSize, len(renderedBlocks)) } setBlock = true case "", "": - if len(renderedBlocks) < windowSize { - windowOffset = 0 - blockTable.SelectedRow = len(renderedBlocks) - break + // Calculate the range of block numbers we are trying to page down to + nextTopBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize))) + if nextTopBlockNumber.Cmp(zero) < 0 { + // If we've gone past the earliest block, set it to the earliest block number + nextTopBlockNumber.SetInt64(0) } - windowOffset += windowSize - // good to go to next page but not enough blocks to fill page - if windowOffset > len(allBlocks)-windowSize { - err := appendOlderBlocks(ctx, ms, rpc) - if err != nil { - log.Warn().Err(err).Msg("Unable to append more history") - } - forceRedraw = true - redraw(ms, true) + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) } - blockTable.SelectedRow = len(renderedBlocks) - setBlock = true + + // Fetch the blocks in the new range if they are missing + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, nextTopBlockNumber) + if err != nil { + log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + break + } + + // Update the top displayed block number + ms.TopDisplayedBlock = nextTopBlockNumber + + blockTable.SelectedRow = 1 + + log.Debug(). + Int("TopDisplayedBlock", int(ms.TopDisplayedBlock.Int64())). + Int("toBlockNumber", int(toBlockNumber.Int64())). + Msg("PageDown") + + // Force redraw to update the UI with the new page of blocks + forceRedraw = true + redraw(ms, true) case "", "": - windowOffset -= windowSize - if windowOffset < 0 { - windowOffset = 0 - blockTable.SelectedRow = 1 + // Calculate the range of block numbers we are trying to page down to + nextTopBlockNumber := new(big.Int).Add(ms.TopDisplayedBlock, big.NewInt(int64(windowSize))) + if nextTopBlockNumber.Cmp(ms.HeadBlock) > 0 { + nextTopBlockNumber.SetInt64(ms.HeadBlock.Int64()) } + + // Calculate the 'to' block number based on the next top block number + toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) + if toBlockNumber.Cmp(zero) < 0 { + toBlockNumber.SetInt64(0) + } + + // Fetch the blocks in the new range if they are missing + _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, nextTopBlockNumber) + if err != nil { + log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + break + } + + // Update the top displayed block number + ms.TopDisplayedBlock = nextTopBlockNumber + + blockTable.SelectedRow = 1 + + log.Debug(). + Int("TopDisplayedBlock", int(ms.TopDisplayedBlock.Int64())). + Int("toBlockNumber", int(toBlockNumber.Int64())). + Msg("PageDown") + + // Force redraw to update the UI with the new page of blocks + forceRedraw = true + redraw(ms, true) default: log.Trace().Str("id", e.ID).Msg("Unknown ui event") } @@ -653,7 +692,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu if !forceRedraw { redraw(ms) } - case <-ticker: + case <-ticker.C: if currentBn != ms.HeadBlock { currentBn = ms.HeadBlock redraw(ms) @@ -662,6 +701,35 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu } } +func isBlockInCache(cache *lru.Cache, blockNumber *big.Int) bool { + _, exists := cache.Get(blockNumber.String()) + return exists +} + +func checkAndFetchMissingBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client, fromBlockNum, toBlockNum *big.Int) ([]*big.Int, error) { + var missingBlocks []*big.Int + + // Iterate over the range and check if each block is in the cache. + for i := new(big.Int).Set(fromBlockNum); i.Cmp(toBlockNum) <= 0; i.Add(i, one) { + if _, ok := ms.BlockCache.Get(i.String()); !ok { + // Block is not in cache, so mark it as missing. + missingBlocks = append(missingBlocks, new(big.Int).Set(i)) + } + } + + // If there are missing blocks, fetch them using getBlockRange. + if len(missingBlocks) > 0 { + err := ms.getBlockRange(ctx, missingBlocks[0], missingBlocks[len(missingBlocks)-1], rpc) + if err != nil { + // Handle the error, such as logging or returning it. + return nil, err + } + } + + // Return the list of block numbers that were missing and are now fetched. + return missingBlocks, nil +} + func max(nums ...int) int { m := nums[0] for _, n := range nums { diff --git a/doc/polycli_monitor.md b/doc/polycli_monitor.md index 5c31844f..94f0d89b 100644 --- a/doc/polycli_monitor.md +++ b/doc/polycli_monitor.md @@ -29,6 +29,7 @@ If you're experiencing missing blocks, try adjusting the `--batch-size` and `--i ```bash -b, --batch-size string Number of requests per batch (default "auto") + -c, --cache-limit int Number of cached blocks for the LRU block data structure (Min 100) (default 100) -h, --help help for monitor -i, --interval string Amount of time between batch block rpc calls (default "5s") -r, --rpc-url string The RPC endpoint url (default "http://localhost:8545") diff --git a/go.mod b/go.mod index 7f0d53b5..753880b3 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.13.2 github.com/gizak/termui/v3 v3.1.0 github.com/google/gofuzz v1.2.0 + github.com/hashicorp/golang-lru v1.0.2 github.com/jedib0t/go-pretty/v6 v6.4.8 github.com/libp2p/go-libp2p v0.31.0 github.com/oasisprotocol/curve25519-voi v0.0.0-20230904125328-1f23a7beb09a diff --git a/go.sum b/go.sum index a5a40a35..01df85a5 100644 --- a/go.sum +++ b/go.sum @@ -249,6 +249,10 @@ github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpx github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= +github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/holiman/billy v0.0.0-20230718173358-1c7e68d277a7 h1:3JQNjnMRil1yD0IfZKHF9GxxWKDJGj8I0IqOUol//sw= From 90a3cb390b3e71bcb265526e9e0d57877496f6a1 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Mon, 13 Nov 2023 23:10:56 -0500 Subject: [PATCH 2/3] DVT-1057 replace block data structure with LRU cache to fix memory leak --- cmd/monitor/monitor.go | 123 +++++++++++++++-------------------------- 1 file changed, 44 insertions(+), 79 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index bc0f9b7d..dad65f5b 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "sync" "time" lru "github.com/hashicorp/golang-lru" @@ -40,7 +41,8 @@ type ( PeerCount uint64 GasPrice *big.Int PendingCount uint64 - BlockCache *lru.Cache + BlockCache *lru.Cache `json:"-"` + BlocksLock sync.RWMutex `json:"-"` } chainState struct { HeadBlock uint64 @@ -86,10 +88,13 @@ func monitor(ctx context.Context) error { ec := ethclient.NewClient(rpc) ms := new(monitorStatus) + ms.BlocksLock.Lock() ms.BlockCache, _ = lru.New(blockCacheLimit) + ms.BlocksLock.Unlock() ms.ChainID = big.NewInt(0) ms.PendingCount = 0 + observedPendingTxs = make(historicalRange, 0) isUiRendered := false @@ -168,23 +173,7 @@ func (h historicalRange) getValues(limit int) []float64 { return values } -func prependLatestBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client) { - from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1))) - - if from.Cmp(zero) < 0 { - from.SetInt64(0) - } - - log.Debug(). - Int64("from", from.Int64()). - Int64("to", ms.HeadBlock.Int64()). - Msg("Fetching latest blocks") - - err := ms.getBlockRange(ctx, from, ms.HeadBlock, rpc) - if err != nil { - log.Error().Err(err).Msg("There was an issue fetching the block range") - } -} +const maxDataPoints = 1000 func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) { var cs *chainState @@ -195,6 +184,9 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r return err } observedPendingTxs = append(observedPendingTxs, historicalDataPoint{SampleTime: time.Now(), SampleValue: float64(cs.PendingCount)}) + if len(observedPendingTxs) > maxDataPoints { + observedPendingTxs = observedPendingTxs[1:] + } log.Debug().Uint64("PeerCount", cs.PeerCount).Uint64("ChainID", cs.ChainID.Uint64()).Uint64("HeadBlock", cs.HeadBlock).Uint64("GasPrice", cs.GasPrice.Uint64()).Msg("Fetching blocks") @@ -211,26 +203,41 @@ func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, r ms.GasPrice = cs.GasPrice ms.PendingCount = cs.PendingCount - prependLatestBlocks(ctx, ms, rpc) + from := new(big.Int).Sub(ms.HeadBlock, big.NewInt(int64(batchSize-1))) + + if from.Cmp(zero) < 0 { + from.SetInt64(0) + } + + err = ms.getBlockRange(ctx, from, ms.HeadBlock, rpc) + if err != nil { + return err + } return } func (ms *monitorStatus) getBlockRange(ctx context.Context, from, to *big.Int, rpc *ethrpc.Client) error { + ms.BlocksLock.Lock() blms := make([]ethrpc.BatchElem, 0) - for i := from; i.Cmp(to) != 1; i.Add(i, one) { + for i := new(big.Int).Set(from); i.Cmp(to) <= 0; i.Add(i, big.NewInt(1)) { + if _, found := ms.BlockCache.Get(i.String()); found { + continue + } r := new(rpctypes.RawBlockResponse) - var err error blms = append(blms, ethrpc.BatchElem{ Method: "eth_getBlockByNumber", Args: []interface{}{"0x" + i.Text(16), true}, Result: r, - Error: err, + Error: nil, }) } + ms.BlocksLock.Unlock() + if len(blms) == 0 { return nil } + b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 3 * time.Minute retryable := func() error { @@ -241,12 +248,14 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, from, to *big.Int, r if err != nil { return err } + + ms.BlocksLock.Lock() + defer ms.BlocksLock.Unlock() for _, b := range blms { if b.Error != nil { - return b.Error + continue } pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse)) - ms.BlockCache.Add(pb.Number().String(), pb) } @@ -382,16 +391,14 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu if blockTable.SelectedRow == 0 { ms.TopDisplayedBlock = ms.HeadBlock - // Calculate the 'to' block number based on the next top block number toBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize-1))) if toBlockNumber.Cmp(zero) < 0 { toBlockNumber.SetInt64(0) } - // Fetch the blocks in the new range if they are missing - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, ms.TopDisplayedBlock) + err := ms.getBlockRange(ctx, toBlockNumber, ms.TopDisplayedBlock, rpc) if err != nil { - log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + log.Error().Err(err).Msg("There was an issue fetching the block range") } } toBlockNumber := ms.TopDisplayedBlock @@ -400,6 +407,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu fromBlockNumber.SetInt64(0) // We cannot have block numbers less than 0. } renderedBlocksTemp := make([]rpctypes.PolyBlock, 0, windowSize) + ms.BlocksLock.Lock() for i := new(big.Int).Set(fromBlockNumber); i.Cmp(toBlockNumber) <= 0; i.Add(i, big.NewInt(1)) { if block, ok := ms.BlockCache.Get(i.String()); ok { renderedBlocksTemp = append(renderedBlocksTemp, block.(rpctypes.PolyBlock)) @@ -408,6 +416,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu log.Warn().Str("blockNumber", i.String()).Msg("Block should be in cache but is not") } } + ms.BlocksLock.Unlock() renderedBlocks = renderedBlocksTemp termUi.h0.Text = fmt.Sprintf("Height: %s\nTime: %s", ms.HeadBlock.String(), time.Now().Format("02 Jan 06 15:04:05 MST")) @@ -469,16 +478,14 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu blockTable.SelectedRow = 0 currentMode = monitorModeExplorer - // Calculate the 'to' block number based on the next top block number toBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize-1))) if toBlockNumber.Cmp(zero) < 0 { toBlockNumber.SetInt64(0) } - // Fetch the blocks in the new range if they are missing - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, ms.TopDisplayedBlock) + err := ms.getBlockRange(ctx, toBlockNumber, ms.TopDisplayedBlock, rpc) if err != nil { - log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + log.Error().Err(err).Msg("There was an issue fetching the block range") break } case "": @@ -516,37 +523,30 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu Int("dy", blockTable.Dy()). Msg("Down") - // the last row of current window size if blockTable.SelectedRow > windowSize-1 { - // Calculate the range of block numbers we are trying to page down to nextTopBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, one) if nextTopBlockNumber.Cmp(zero) < 0 { - // If we've gone past the earliest block, set it to the earliest block number nextTopBlockNumber.SetInt64(0) } - // Calculate the 'to' block number based on the next top block number toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) if toBlockNumber.Cmp(zero) < 0 { toBlockNumber.SetInt64(0) } - // Fetch the blocks if the next block is not in cache if !isBlockInCache(ms.BlockCache, toBlockNumber) { - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize))), toBlockNumber) + err := ms.getBlockRange(ctx, new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize))), toBlockNumber, rpc) if err != nil { log.Warn().Err(err).Msg("Failed to fetch blocks on page down") break } } - // Update the top displayed block number ms.TopDisplayedBlock = nextTopBlockNumber blockTable.SelectedRow = len(renderedBlocks) setBlock = true - // Force redraw to update the UI with the new page of blocks forceRedraw = true redraw(ms, true) break @@ -572,7 +572,7 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu // Fetch the blocks in the new range if they are missing if !isBlockInCache(ms.BlockCache, nextTopBlockNumber) { - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, new(big.Int).Add(nextTopBlockNumber, big.NewInt(int64(windowSize)))) + err := ms.getBlockRange(ctx, toBlockNumber, new(big.Int).Add(nextTopBlockNumber, big.NewInt(int64(windowSize))), rpc) if err != nil { log.Warn().Err(err).Msg("Failed to fetch blocks on page up") break @@ -613,27 +613,22 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu } setBlock = true case "", "": - // Calculate the range of block numbers we are trying to page down to nextTopBlockNumber := new(big.Int).Sub(ms.TopDisplayedBlock, big.NewInt(int64(windowSize))) if nextTopBlockNumber.Cmp(zero) < 0 { - // If we've gone past the earliest block, set it to the earliest block number nextTopBlockNumber.SetInt64(0) } - // Calculate the 'to' block number based on the next top block number toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) if toBlockNumber.Cmp(zero) < 0 { toBlockNumber.SetInt64(0) } - // Fetch the blocks in the new range if they are missing - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, nextTopBlockNumber) + err := ms.getBlockRange(ctx, toBlockNumber, nextTopBlockNumber, rpc) if err != nil { log.Warn().Err(err).Msg("Failed to fetch blocks on page down") break } - // Update the top displayed block number ms.TopDisplayedBlock = nextTopBlockNumber blockTable.SelectedRow = 1 @@ -643,30 +638,25 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu Int("toBlockNumber", int(toBlockNumber.Int64())). Msg("PageDown") - // Force redraw to update the UI with the new page of blocks forceRedraw = true redraw(ms, true) case "", "": - // Calculate the range of block numbers we are trying to page down to nextTopBlockNumber := new(big.Int).Add(ms.TopDisplayedBlock, big.NewInt(int64(windowSize))) if nextTopBlockNumber.Cmp(ms.HeadBlock) > 0 { nextTopBlockNumber.SetInt64(ms.HeadBlock.Int64()) } - // Calculate the 'to' block number based on the next top block number toBlockNumber := new(big.Int).Sub(nextTopBlockNumber, big.NewInt(int64(windowSize-1))) if toBlockNumber.Cmp(zero) < 0 { toBlockNumber.SetInt64(0) } - // Fetch the blocks in the new range if they are missing - _, err := checkAndFetchMissingBlocks(ctx, ms, rpc, toBlockNumber, nextTopBlockNumber) + err := ms.getBlockRange(ctx, toBlockNumber, nextTopBlockNumber, rpc) if err != nil { - log.Warn().Err(err).Msg("Failed to fetch blocks on page down") + log.Warn().Err(err).Msg("Failed to fetch blocks on page up") break } - // Update the top displayed block number ms.TopDisplayedBlock = nextTopBlockNumber blockTable.SelectedRow = 1 @@ -676,7 +666,6 @@ func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatu Int("toBlockNumber", int(toBlockNumber.Int64())). Msg("PageDown") - // Force redraw to update the UI with the new page of blocks forceRedraw = true redraw(ms, true) default: @@ -706,30 +695,6 @@ func isBlockInCache(cache *lru.Cache, blockNumber *big.Int) bool { return exists } -func checkAndFetchMissingBlocks(ctx context.Context, ms *monitorStatus, rpc *ethrpc.Client, fromBlockNum, toBlockNum *big.Int) ([]*big.Int, error) { - var missingBlocks []*big.Int - - // Iterate over the range and check if each block is in the cache. - for i := new(big.Int).Set(fromBlockNum); i.Cmp(toBlockNum) <= 0; i.Add(i, one) { - if _, ok := ms.BlockCache.Get(i.String()); !ok { - // Block is not in cache, so mark it as missing. - missingBlocks = append(missingBlocks, new(big.Int).Set(i)) - } - } - - // If there are missing blocks, fetch them using getBlockRange. - if len(missingBlocks) > 0 { - err := ms.getBlockRange(ctx, missingBlocks[0], missingBlocks[len(missingBlocks)-1], rpc) - if err != nil { - // Handle the error, such as logging or returning it. - return nil, err - } - } - - // Return the list of block numbers that were missing and are now fetched. - return missingBlocks, nil -} - func max(nums ...int) int { m := nums[0] for _, n := range nums { From fa9656f5ed915f67dae2456f5da5ff16abc92e71 Mon Sep 17 00:00:00 2001 From: Jesse Lee Date: Tue, 14 Nov 2023 10:40:11 -0500 Subject: [PATCH 3/3] leo fix: put variable up top --- cmd/monitor/monitor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/monitor/monitor.go b/cmd/monitor/monitor.go index dad65f5b..ff3d79d2 100644 --- a/cmd/monitor/monitor.go +++ b/cmd/monitor/monitor.go @@ -31,6 +31,7 @@ var ( zero = big.NewInt(0) selectedBlock rpctypes.PolyBlock observedPendingTxs historicalRange + maxDataPoints = 1000 ) type ( @@ -173,8 +174,6 @@ func (h historicalRange) getValues(limit int) []float64 { return values } -const maxDataPoints = 1000 - func fetchBlocks(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client, isUiRendered bool) (err error) { var cs *chainState cs, err = getChainState(ctx, ec)