From 7e77724bd815e2453a4044cbf3d32fea735ba734 Mon Sep 17 00:00:00 2001 From: Bartek Tofel Date: Fri, 19 Apr 2024 13:01:53 +0200 Subject: [PATCH] subscribe to headers in benchmark test to observe upkeeps --- .../ethereum_contracts_automation_seth.go | 184 ++++++++++++ .../contracts/ethereum_keeper_contracts.go | 20 +- .../testsetups/keeper_benchmark.go | 268 +++++------------- 3 files changed, 269 insertions(+), 203 deletions(-) diff --git a/integration-tests/contracts/ethereum_contracts_automation_seth.go b/integration-tests/contracts/ethereum_contracts_automation_seth.go index 09098938557..9ba33cb3b42 100644 --- a/integration-tests/contracts/ethereum_contracts_automation_seth.go +++ b/integration-tests/contracts/ethereum_contracts_automation_seth.go @@ -2,6 +2,7 @@ package contracts import ( "context" + "errors" "fmt" "math/big" "strconv" @@ -17,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/networks" "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" eth_contracts "github.com/smartcontractkit/chainlink/integration-tests/contracts/ethereum" + "github.com/smartcontractkit/chainlink/integration-tests/testreporters" "github.com/smartcontractkit/chainlink/integration-tests/wrappers" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/arbitrum_module" @@ -2296,3 +2298,185 @@ func DeployKeeperConsumerBenchmark(client *seth.Client) (AutomationConsumerBench address: &data.Address, }, nil } + +// KeeperConsumerBenchmarkUpkeepObserver is a header subscription that awaits for a round of upkeeps +type KeeperConsumerBenchmarkUpkeepObserver struct { + instance AutomationConsumerBenchmark + registry KeeperRegistry + upkeepID *big.Int + + firstBlockNum uint64 // Records the number of the first block that came in + lastBlockNum uint64 // Records the number of the last block that came in + blockRange int64 // How many blocks to watch upkeeps for + upkeepSLA int64 // SLA after which an upkeep is counted as 'missed' + metricsReporter *testreporters.KeeperBenchmarkTestReporter // Testreporter to track results + upkeepIndex int64 + firstEligibleBuffer int64 + + // State variables, changes as we get blocks + blocksSinceSubscription int64 // How many blocks have passed since subscribing + blocksSinceEligible int64 // How many blocks have come in since upkeep has been eligible for check + countEligible int64 // Number of times the upkeep became eligible + countMissed int64 // Number of times we missed SLA for performing upkeep + upkeepCount int64 // The count of upkeeps done so far + allCheckDelays []int64 // Tracks the amount of blocks missed before an upkeep since it became eligible + complete bool + l zerolog.Logger +} + +// NewKeeperConsumerBenchmarkkUpkeepObserver provides a new instance of a KeeperConsumerBenchmarkkUpkeepObserver +// Used to track and log benchmark test results for keepers +func NewKeeperConsumerBenchmarkkUpkeepObserver( + contract AutomationConsumerBenchmark, + registry KeeperRegistry, + upkeepID *big.Int, + blockRange int64, + upkeepSLA int64, + metricsReporter *testreporters.KeeperBenchmarkTestReporter, + upkeepIndex int64, + firstEligibleBuffer int64, + logger zerolog.Logger, +) *KeeperConsumerBenchmarkUpkeepObserver { + return &KeeperConsumerBenchmarkUpkeepObserver{ + instance: contract, + registry: registry, + upkeepID: upkeepID, + blockRange: blockRange, + upkeepSLA: upkeepSLA, + blocksSinceSubscription: 0, + blocksSinceEligible: 0, + upkeepCount: 0, + allCheckDelays: []int64{}, + metricsReporter: metricsReporter, + complete: false, + lastBlockNum: 0, + upkeepIndex: upkeepIndex, + firstBlockNum: 0, + firstEligibleBuffer: firstEligibleBuffer, + l: logger, + } +} + +// ReceiveHeader will query the latest Keeper round and check to see whether upkeep was performed, it returns +// true when observation has finished. +func (o *KeeperConsumerBenchmarkUpkeepObserver) ReceiveHeader(receivedHeader *types.Header) (bool, error) { + if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count + return false, nil + } + if o.firstBlockNum == 0 { + o.firstBlockNum = receivedHeader.Number.Uint64() + } + o.lastBlockNum = receivedHeader.Number.Uint64() + // Increment block counters + o.blocksSinceSubscription++ + + upkeepCount, err := o.instance.GetUpkeepCount(context.Background(), big.NewInt(o.upkeepIndex)) + if err != nil { + return false, err + } + + if upkeepCount.Int64() > o.upkeepCount { // A new upkeep was done + if upkeepCount.Int64() != o.upkeepCount+1 { + return false, errors.New("upkeep count increased by more than 1 in a single block") + } + o.l.Info(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Int64("Upkeep_Count", upkeepCount.Int64()). + Int64("Blocks_since_eligible", o.blocksSinceEligible). + Str("Registry_Address", o.registry.Address()). + Msg("Upkeep Performed") + + if o.blocksSinceEligible > o.upkeepSLA { + o.l.Warn(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Int64("Blocks_since_eligible", o.blocksSinceEligible). + Str("Registry_Address", o.registry.Address()). + Msg("Upkeep Missed SLA") + o.countMissed++ + } + + o.allCheckDelays = append(o.allCheckDelays, o.blocksSinceEligible) + o.upkeepCount++ + o.blocksSinceEligible = 0 + } + + isEligible, err := o.instance.CheckEligible(context.Background(), big.NewInt(o.upkeepIndex), big.NewInt(o.blockRange), big.NewInt(o.firstEligibleBuffer)) + if err != nil { + return false, err + } + if isEligible { + if o.blocksSinceEligible == 0 { + // First time this upkeep became eligible + o.countEligible++ + o.l.Info(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Str("Registry_Address", o.registry.Address()). + Msg("Upkeep Now Eligible") + } + o.blocksSinceEligible++ + } + + if o.blocksSinceSubscription >= o.blockRange || int64(o.lastBlockNum-o.firstBlockNum) >= o.blockRange { + if o.blocksSinceEligible > 0 { + if o.blocksSinceEligible > o.upkeepSLA { + o.l.Warn(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Int64("Blocks_since_eligible", o.blocksSinceEligible). + Str("Registry_Address", o.registry.Address()). + Msg("Upkeep remained eligible at end of test and missed SLA") + o.countMissed++ + } else { + o.l.Info(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Int64("Upkeep_Count", upkeepCount.Int64()). + Int64("Blocks_since_eligible", o.blocksSinceEligible). + Str("Registry_Address", o.registry.Address()). + Msg("Upkeep remained eligible at end of test and was within SLA") + } + o.allCheckDelays = append(o.allCheckDelays, o.blocksSinceEligible) + } + + o.l.Info(). + Uint64("Block_Number", receivedHeader.Number.Uint64()). + Str("Upkeep_ID", o.upkeepID.String()). + Str("Contract_Address", o.instance.Address()). + Int64("Upkeeps_Performed", upkeepCount.Int64()). + Int64("Total_Blocks_Watched", o.blocksSinceSubscription). + Str("Registry_Address", o.registry.Address()). + Msg("Finished Watching for Upkeeps") + + o.complete = true + return true, nil + } + return false, nil +} + +// Complete returns whether watching for upkeeps has completed +func (o *KeeperConsumerBenchmarkUpkeepObserver) Complete() bool { + return o.complete +} + +// LogDetails logs the results of the benchmark test to testreporter +func (o *KeeperConsumerBenchmarkUpkeepObserver) LogDetails() { + report := testreporters.KeeperBenchmarkTestReport{ + ContractAddress: o.instance.Address(), + TotalEligibleCount: o.countEligible, + TotalSLAMissedUpkeeps: o.countMissed, + TotalPerformedUpkeeps: o.upkeepCount, + AllCheckDelays: o.allCheckDelays, + RegistryAddress: o.registry.Address(), + } + o.metricsReporter.ReportMutex.Lock() + o.metricsReporter.Reports = append(o.metricsReporter.Reports, report) + defer o.metricsReporter.ReportMutex.Unlock() +} diff --git a/integration-tests/contracts/ethereum_keeper_contracts.go b/integration-tests/contracts/ethereum_keeper_contracts.go index a4ae4348426..5da5444679b 100644 --- a/integration-tests/contracts/ethereum_keeper_contracts.go +++ b/integration-tests/contracts/ethereum_keeper_contracts.go @@ -1595,8 +1595,8 @@ func (o *KeeperConsumerPerformanceRoundConfirmer) logDetails() { defer o.metricsReporter.ReportMutex.Unlock() } -// KeeperConsumerBenchmarkRoundConfirmer is a header subscription that awaits for a round of upkeeps -type KeeperConsumerBenchmarkRoundConfirmer struct { +// LegacyKeeperConsumerBenchmarkRoundConfirmer is a header subscription that awaits for a round of upkeeps +type LegacyKeeperConsumerBenchmarkRoundConfirmer struct { instance AutomationConsumerBenchmark registry KeeperRegistry upkeepID *big.Int @@ -1623,9 +1623,9 @@ type KeeperConsumerBenchmarkRoundConfirmer struct { l zerolog.Logger } -// NewKeeperConsumerBenchmarkRoundConfirmer provides a new instance of a KeeperConsumerBenchmarkRoundConfirmer +// NewLegacyKeeperConsumerBenchmarkRoundConfirmer provides a new instance of a LegacyKeeperConsumerBenchmarkRoundConfirmer // Used to track and log benchmark test results for keepers -func NewKeeperConsumerBenchmarkRoundConfirmer( +func NewLegacyKeeperConsumerBenchmarkRoundConfirmer( contract AutomationConsumerBenchmark, registry KeeperRegistry, upkeepID *big.Int, @@ -1635,9 +1635,9 @@ func NewKeeperConsumerBenchmarkRoundConfirmer( upkeepIndex int64, firstEligibleBuffer int64, logger zerolog.Logger, -) *KeeperConsumerBenchmarkRoundConfirmer { +) *LegacyKeeperConsumerBenchmarkRoundConfirmer { ctx, cancelFunc := context.WithCancel(context.Background()) - return &KeeperConsumerBenchmarkRoundConfirmer{ + return &LegacyKeeperConsumerBenchmarkRoundConfirmer{ instance: contract, registry: registry, upkeepID: upkeepID, @@ -1661,7 +1661,7 @@ func NewKeeperConsumerBenchmarkRoundConfirmer( } // ReceiveHeader will query the latest Keeper round and check to see whether the round has confirmed -func (o *KeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blockchain.NodeHeader) error { +func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blockchain.NodeHeader) error { if receivedHeader.Number.Uint64() <= o.lastBlockNum { // Uncle / reorg we won't count return nil } @@ -1765,7 +1765,7 @@ func (o *KeeperConsumerBenchmarkRoundConfirmer) ReceiveHeader(receivedHeader blo } // Wait is a blocking function that will wait until the round has confirmed, and timeout if the deadline has passed -func (o *KeeperConsumerBenchmarkRoundConfirmer) Wait() error { +func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) Wait() error { defer func() { o.complete = true }() for { select { @@ -1779,11 +1779,11 @@ func (o *KeeperConsumerBenchmarkRoundConfirmer) Wait() error { } } -func (o *KeeperConsumerBenchmarkRoundConfirmer) Complete() bool { +func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) Complete() bool { return o.complete } -func (o *KeeperConsumerBenchmarkRoundConfirmer) logDetails() { +func (o *LegacyKeeperConsumerBenchmarkRoundConfirmer) logDetails() { report := testreporters.KeeperBenchmarkTestReport{ ContractAddress: o.instance.Address(), TotalEligibleCount: o.countEligible, diff --git a/integration-tests/testsetups/keeper_benchmark.go b/integration-tests/testsetups/keeper_benchmark.go index b228c665c36..fd45d1d6093 100644 --- a/integration-tests/testsetups/keeper_benchmark.go +++ b/integration-tests/testsetups/keeper_benchmark.go @@ -2,12 +2,12 @@ package testsetups import ( "context" - "errors" "fmt" "math" "math/big" "os" "os/signal" + "sync/atomic" "syscall" "testing" "time" @@ -17,7 +17,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - pkgerrors "github.com/pkg/errors" + "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/slack-go/slack" @@ -225,7 +225,6 @@ func (k *KeeperBenchmarkTest) Run() { nodesWithoutBootstrap := k.chainlinkNodes[1:] for rIndex := range k.keeperRegistries { - var txKeyId = rIndex if inputs.ForceSingleTxnKey { txKeyId = 0 @@ -253,218 +252,101 @@ func (k *KeeperBenchmarkTest) Run() { } } - type roundsObservationData struct { - instance contracts.AutomationConsumerBenchmark - registry contracts.KeeperRegistry - metricsReporter *testreporters.KeeperBenchmarkTestReporter - upkeepID *big.Int - upkeepIndex int64 - upkeepCount int - upkeepSLA int64 - countMissed uint64 - firstBlockNum uint64 - lastBlockNumber uint64 - countEligible uint64 - allCheckDelays []int64 - blocksSinceEligible int64 - blocksSinceSubscription uint64 - firstEligibleBuffer int64 - blockRange int64 - } - - // observeRunds queries the upkpeep contract for the upkeep count and checks if the upkeep is eligible - var observeRunds = func(r *roundsObservationData) (bool, error) { - lastBlock, err := k.chainClient.Client.BlockNumber(context.Background()) - if err != nil { - return false, err - } - - if r.firstBlockNum == 0 { - r.firstBlockNum = lastBlock - } - - blockDiff := lastBlock - r.lastBlockNumber - - if blockDiff <= 0 { // Uncle / reorg we won't count / no new block since last check - k.log.Debug().Int64("Upkeep index", r.upkeepIndex).Msg("No new block found") - return false, nil - } - - r.blocksSinceSubscription = lastBlock - r.firstBlockNum - r.lastBlockNumber = lastBlock - - upkeepCount, err := r.instance.GetUpkeepCount(context.Background(), big.NewInt(r.upkeepIndex)) - if err != nil { - return false, err - } - - if upkeepCount == nil { - return false, fmt.Errorf("upkeep count returned by upkeepID %s was nil", r.upkeepID.String()) - } - - if int(upkeepCount.Int64()) > r.upkeepCount { - if (int(upkeepCount.Int64()) - r.upkeepCount) > int(blockDiff) { - return false, errors.New("upkeep count increased by more than 1 in a single block") - } - - k.log.Info(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Int64("Upkeep_Count", upkeepCount.Int64()). - Int64("Blocks_since_eligible", r.blocksSinceEligible). - Str("Registry_Address", r.registry.Address()). - Msg("Upkeep Performed") - - if r.blocksSinceEligible > r.upkeepSLA { - k.log.Warn(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Int64("Blocks_since_eligible", r.blocksSinceEligible). - Str("Registry_Address", r.registry.Address()). - Msg("Upkeep Missed SLA") - r.countMissed = blockDiff - } - - r.allCheckDelays = append(r.allCheckDelays, r.blocksSinceEligible) - r.upkeepCount = int(upkeepCount.Int64()) - r.blocksSinceEligible = 0 - } - - isEligible, err := r.instance.CheckEligible(context.Background(), big.NewInt(r.upkeepIndex), big.NewInt(r.blockRange), big.NewInt(r.firstEligibleBuffer)) - if err != nil { - return false, pkgerrors.Wrapf(err, "failed to check upkeep eligibility for upkeepID %s", r.upkeepID.String()) - } - - if isEligible { - if r.blocksSinceEligible == 0 { - // First time this upkeep became eligible - r.countEligible = blockDiff - k.log.Info(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Str("Registry_Address", r.registry.Address()). - Msg("Upkeep Now Eligible") - } - r.blocksSinceEligible = int64(blockDiff) - } - - k.log.Info(). - Str("Upkeep_ID", r.upkeepID.String()). - Int64("Block range", r.blockRange). - Uint64("blocksSinceSubscription", r.blocksSinceSubscription). - Int64("Blocks left", r.blockRange-int64(r.blocksSinceSubscription)). - Int64("Block difference (first-current)", int64(r.lastBlockNumber-r.firstBlockNum)). - Msg("Checking upkeep") - - if (int64(r.blocksSinceSubscription) >= r.blockRange) || (int64(r.lastBlockNumber-r.firstBlockNum) >= r.blockRange) { - if r.blocksSinceEligible > 0 { - if r.blocksSinceEligible > r.upkeepSLA { - k.log.Warn(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Int64("Blocks_since_eligible", r.blocksSinceEligible). - Str("Registry_Address", r.registry.Address()). - Msg("Upkeep remained eligible at end of test and missed SLA") - r.countMissed = blockDiff - } else { - k.log.Info(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Int64("Upkeep_Count", upkeepCount.Int64()). - Int64("Blocks_since_eligible", r.blocksSinceEligible). - Str("Registry_Address", r.registry.Address()). - Msg("Upkeep remained eligible at end of test and was within SLA") - } - r.allCheckDelays = append(r.allCheckDelays, r.blocksSinceEligible) - } - - k.log.Info(). - Uint64("Block_Number", lastBlock). - Str("Upkeep_ID", r.upkeepID.String()). - Str("Contract_Address", r.instance.Address()). - Int64("Upkeeps_Performed", upkeepCount.Int64()). - Uint64("Total_Blocks_Watched", r.blocksSinceSubscription). - Str("Registry_Address", r.registry.Address()). - Msg("Finished Watching for Upkeeps") - - report := testreporters.KeeperBenchmarkTestReport{ - ContractAddress: r.instance.Address(), - TotalEligibleCount: int64(r.countEligible), - TotalSLAMissedUpkeeps: int64(r.countMissed), - TotalPerformedUpkeeps: int64(r.upkeepCount), - AllCheckDelays: r.allCheckDelays, - RegistryAddress: r.registry.Address(), - } - r.metricsReporter.ReportMutex.Lock() - defer r.metricsReporter.ReportMutex.Unlock() - r.metricsReporter.Reports = append(r.metricsReporter.Reports, report) - - return true, nil - } - - return false, nil - } - - errgroup, errCtx := errgroup.WithContext(context.Background()) - effectiveBlockRange := inputs.Upkeeps.BlockRange + inputs.UpkeepSLA k.log.Info().Msgf("Waiting for %d blocks for all upkeeps to be performed", inputs.Upkeeps.BlockRange+inputs.UpkeepSLA) - timeout := effectiveBlockRange * u.BlockInterval * int64(time.Second) + timeout := effectiveBlockRange * 3 * int64(time.Second) timeout = int64(float64(timeout) * 1.1) timeoutDuration := time.Duration(timeout) - // for each upkeep launch a gouroitine that will observe it every 1 second - // once first error is encountered all other goroutines are cancelled + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(timeoutDuration)) + defer cancel() + errgroup, errCtx := errgroup.WithContext(ctx) + + var startedObservations = atomic.Int32{} + var finishedObservations = atomic.Int32{} + for rIndex := range k.keeperRegistries { + // headerCh := make(chan *types.Header, len(k.upkeepIDs[rIndex])) for index, upkeepID := range k.upkeepIDs[rIndex] { + upkeepIDCopy := upkeepID registryIndex := rIndex upkeepIndex := int64(index) - upkeepIDtoUse := upkeepID - errgroup.Go(func() error { - roundObserver := roundsObservationData{ - instance: k.keeperConsumerContracts[registryIndex], - registry: k.keeperRegistries[registryIndex], - upkeepID: upkeepIDtoUse, - blockRange: inputs.Upkeeps.BlockRange + inputs.UpkeepSLA, - upkeepSLA: inputs.UpkeepSLA, - upkeepIndex: upkeepIndex, - firstEligibleBuffer: inputs.Upkeeps.FirstEligibleBuffer, - metricsReporter: &k.TestReporter, + startedObservations.Add(1) + k.log.Info().Str("UpkeepID", upkeepIDCopy.String()).Msg("Starting upkeep observation") + + // doneCh := make(chan struct{}) + confirmer := contracts.NewKeeperConsumerBenchmarkkUpkeepObserver( + k.keeperConsumerContracts[registryIndex], + k.keeperRegistries[registryIndex], + upkeepIDCopy, + inputs.Upkeeps.BlockRange+inputs.UpkeepSLA, + inputs.UpkeepSLA, + &k.TestReporter, + upkeepIndex, + inputs.Upkeeps.FirstEligibleBuffer, + k.log, + // doneCh, + ) + + k.log.Debug().Str("UpkeepID", upkeepIDCopy.String()).Msg("Subscribing to new headers for upkeep observation") + headerCh := make(chan *types.Header) + sub, err := k.chainClient.Client.SubscribeNewHead(context.Background(), headerCh) + if err != nil { + return err } - testContext, testCancel := context.WithTimeout(context.Background(), timeoutDuration) - defer testCancel() - - ticker := time.NewTicker(time.Second * 1) for { select { - case <-testContext.Done(): - ticker.Stop() - return fmt.Errorf("failed to observe desired block range for upkeep %s before timeout", upkeepIDtoUse.String()) - case <-ticker.C: - done, err := observeRunds(&roundObserver) - if err != nil { - return err + case subscriptionErr := <-sub.Err(): // header listening failed for the upkeep, exit + return errors.Wrapf(subscriptionErr, "listening for new headers for upkeep %s failed. Exiting", upkeepIDCopy.String()) + case <-errCtx.Done(): //one of goroutines errored, shut down gracefully + k.log.Error().Err(errCtx.Err()).Str("UpkeepID", upkeepIDCopy.String()).Msg("Stopping obervations due to error in one of the goroutines") + sub.Unsubscribe() + return nil + case <-ctx.Done(): // timeout, abandon ship! + k.log.Error().Str("UpkeepID", upkeepIDCopy.String()).Msg("Stopping obervations due to timeout") + sub.Unsubscribe() + return fmt.Errorf("failed to observe desired block range for upkeep %s before timeout", upkeepIDCopy.String()) + case header := <-headerCh: // new block, check if upkeep was performed + finished, headerErr := confirmer.ReceiveHeader(header) + if headerErr != nil { + return headerErr } - if done { - return nil + if finished { // observations should be completed as we are beyond block range + finishedObservations.Add(1) + k.log.Info().Str("Done/Total", fmt.Sprintf("%d/%d", finishedObservations.Load(), startedObservations.Load())).Str("UpkeepID", upkeepIDCopy.String()).Msg("Upkeep observation completed") + + sub.Unsubscribe() + if confirmer.Complete() { + confirmer.LogDetails() + return nil + } + return fmt.Errorf("confimer has finished, but without completing observation, this should never happen. UpkdeepID: %s", upkeepIDCopy.String()) } - case <-errCtx.Done(): - // cancel because of other goroutines errored - return nil } - } }) } } + // Progress log for visibility + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + k.log.Warn().Str("Done/Total", fmt.Sprintf("%d/%d", finishedObservations.Load(), startedObservations.Load())).Msg("Upkeep observation progress") + if finishedObservations.Load() == startedObservations.Load() { + return + } + case <-ctx.Done(): + return + } + } + }() + if err := errgroup.Wait(); err != nil { k.t.Fatalf("errored when waiting for upkeeps: %v", err) }