From 0afe3b481fe37c9bd74a59400f27038deeab8fff Mon Sep 17 00:00:00 2001 From: Adam Hamrick Date: Tue, 14 Nov 2023 20:25:39 -0500 Subject: [PATCH 1/2] Batch Keeper Benchmark Read Requests --- .../testsetups/keeper_benchmark.go | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/integration-tests/testsetups/keeper_benchmark.go b/integration-tests/testsetups/keeper_benchmark.go index bb6c582c137..adba6cabd56 100644 --- a/integration-tests/testsetups/keeper_benchmark.go +++ b/integration-tests/testsetups/keeper_benchmark.go @@ -293,32 +293,50 @@ func (k *KeeperBenchmarkTest) Run() { require.NoError(k.t, err, "Error waiting for keeper subscriptions") // Collect logs for each registry to calculate test metrics + // This test generates a LOT of logs, and we need to break up our reads, or risk getting rate-limited by the node + endBlock := big.NewInt(0).Add(k.startingBlock, big.NewInt(u.BlockRange)) registryLogs := make([][]types.Log, len(k.keeperRegistries)) for rIndex := range k.keeperRegistries { + // Variables for the full registry var ( - logs []types.Log - timeout = 5 * time.Second - addr = k.keeperRegistries[rIndex].Address() - filterQuery = geth.FilterQuery{ + logs []types.Log + timeout = 5 * time.Second + addr = k.keeperRegistries[rIndex].Address() + queryStartBlock = big.NewInt(0).Set(k.startingBlock) + ) + + // Gather logs from the registry in 100 block chunks to avoid read limits + for queryStartBlock.Cmp(endBlock) < 0 { + filterQuery := geth.FilterQuery{ Addresses: []common.Address{common.HexToAddress(addr)}, - FromBlock: k.startingBlock, + FromBlock: queryStartBlock, + ToBlock: big.NewInt(0).Add(queryStartBlock, big.NewInt(100)), } + + // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats. err = fmt.Errorf("initial error") // to ensure our for loop runs at least once - ) - for err != nil { // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats. - ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout) - logs, err = k.chainClient.FilterLogs(ctx, filterQuery) - cancel() - if err != nil { - k.log.Error().Err(err). - Interface("Filter Query", filterQuery). - Str("Timeout", timeout.String()). - Msg("Error getting logs from chain, trying again") - } else { - k.log.Info().Int("Log Count", len(logs)).Str("Registry Address", addr).Msg("Collected logs") + for err != nil { + ctx, cancel := context.WithTimeout(utils.TestContext(k.t), timeout) + logs, err = k.chainClient.FilterLogs(ctx, filterQuery) + cancel() + if err != nil { + k.log.Error(). + Err(err). + Interface("Filter Query", filterQuery). + Str("Timeout", timeout.String()). + Msg("Error getting logs from chain, trying again") + timeout = time.Duration(math.Min(float64(timeout)*2, float64(2*time.Minute))) + continue + } + k.log.Info(). + Uint64("From Block", queryStartBlock.Uint64()). + Uint64("To Block", filterQuery.ToBlock.Uint64()). + Int("Log Count", len(logs)). + Str("Registry Address", addr). + Msg("Collected logs") + registryLogs[rIndex] = append(registryLogs[rIndex], logs...) } } - registryLogs[rIndex] = logs } // Count reverts and stale upkeeps From 8ca004f6e4a49e27fc8e4b7d8d2087a804827e6a Mon Sep 17 00:00:00 2001 From: Adam Hamrick Date: Wed, 15 Nov 2023 16:24:51 -0500 Subject: [PATCH 2/2] Actually add number --- integration-tests/testsetups/keeper_benchmark.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/integration-tests/testsetups/keeper_benchmark.go b/integration-tests/testsetups/keeper_benchmark.go index adba6cabd56..d9b389a9650 100644 --- a/integration-tests/testsetups/keeper_benchmark.go +++ b/integration-tests/testsetups/keeper_benchmark.go @@ -294,8 +294,11 @@ func (k *KeeperBenchmarkTest) Run() { // Collect logs for each registry to calculate test metrics // This test generates a LOT of logs, and we need to break up our reads, or risk getting rate-limited by the node - endBlock := big.NewInt(0).Add(k.startingBlock, big.NewInt(u.BlockRange)) - registryLogs := make([][]types.Log, len(k.keeperRegistries)) + var ( + endBlock = big.NewInt(0).Add(k.startingBlock, big.NewInt(u.BlockRange)) + registryLogs = make([][]types.Log, len(k.keeperRegistries)) + blockBatchSize int64 = 100 + ) for rIndex := range k.keeperRegistries { // Variables for the full registry var ( @@ -310,7 +313,7 @@ func (k *KeeperBenchmarkTest) Run() { filterQuery := geth.FilterQuery{ Addresses: []common.Address{common.HexToAddress(addr)}, FromBlock: queryStartBlock, - ToBlock: big.NewInt(0).Add(queryStartBlock, big.NewInt(100)), + ToBlock: big.NewInt(0).Add(queryStartBlock, big.NewInt(blockBatchSize)), } // This RPC call can possibly time out or otherwise die. Failure is not an option, keep retrying to get our stats. @@ -334,6 +337,7 @@ func (k *KeeperBenchmarkTest) Run() { Int("Log Count", len(logs)). Str("Registry Address", addr). Msg("Collected logs") + queryStartBlock.Add(queryStartBlock, big.NewInt(blockBatchSize)) registryLogs[rIndex] = append(registryLogs[rIndex], logs...) } }