diff --git a/protocol/rpcprovider/rewardserver/reward_server.go b/protocol/rpcprovider/rewardserver/reward_server.go index 7fdc0f24d1..2f56f93ac2 100644 --- a/protocol/rpcprovider/rewardserver/reward_server.go +++ b/protocol/rpcprovider/rewardserver/reward_server.go @@ -15,6 +15,7 @@ import ( "github.com/lavanet/lava/protocol/lavasession" "github.com/lavanet/lava/protocol/metrics" "github.com/lavanet/lava/utils" + "github.com/lavanet/lava/utils/lavaslices" "github.com/lavanet/lava/utils/rand" "github.com/lavanet/lava/utils/sigs" pairingtypes "github.com/lavanet/lava/x/pairing/types" @@ -32,6 +33,8 @@ const ( DefaultRewardsSnapshotTimeoutSec = 30 MaxPaymentRequestsRetiresForSession = 3 RewardServerMaxRelayRetires = 3 + splitRewardsIntoChunksSize = 500 // if the reward array is larger than this it will split it into chunks and send multiple requests instead of a huge one + debug = false ) type PaymentRequest struct { @@ -58,8 +61,8 @@ type ConsumerRewards struct { } func (csrw *ConsumerRewards) PrepareRewardsForClaim() (retProofs []*pairingtypes.RelaySession, errRet error) { + utils.LavaFormatDebug("Adding reward ids for claim", utils.LogAttr("number_of_proofs", len(csrw.proofs))) for _, proof := range csrw.proofs { - utils.LavaFormatDebug("Adding reward id for claim", utils.LogAttr("Id", proof.SessionId)) retProofs = append(retProofs, proof) } return @@ -75,6 +78,11 @@ type RelaySessionsToRetryAttempts struct { paymentRequestRetryAttempts uint64 } +type PaymentConfiguration struct { + relaySessionChunks [][]*pairingtypes.RelaySession // small chunks of relay session to request payments for + shouldAddExpectedPayment bool +} + type RewardServer struct { rewardsTxSender RewardsTxSender lock sync.RWMutex @@ -223,62 +231,100 @@ func (rws *RewardServer) sendRewardsClaim(ctx context.Context, epoch uint64) err return utils.LavaFormatError("sendRewardsClaim failed to get earliest block in memory", err) } + // Handle Failed rewards claim with retry failedRewardRequestsToRetry := rws.gatherFailedRequestPaymentsToRetry(earliestSavedEpoch) if len(failedRewardRequestsToRetry) > 0 { utils.LavaFormatDebug("Found failed reward claims, retrying", utils.LogAttr("number_of_rewards", len((failedRewardRequestsToRetry)))) - specs := map[string]struct{}{} - for _, relay := range failedRewardRequestsToRetry { - utils.LavaFormatDebug("[sendRewardsClaim] retrying failed id", utils.LogAttr("id", relay.SessionId)) - specs[relay.SpecId] = struct{}{} - } - - err = rws.rewardsTxSender.TxRelayPayment(ctx, failedRewardRequestsToRetry, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs)) - if err != nil { - rws.updatePaymentRequestAttempt(failedRewardRequestsToRetry, false) - utils.LavaFormatError("failed sending previously failed payment requests", err) - } else { - rws.updatePaymentRequestAttempt(failedRewardRequestsToRetry, true) - } + } + failedRewardsToClaimChunks := lavaslices.SplitGenericSliceIntoChunks(failedRewardRequestsToRetry, splitRewardsIntoChunksSize) + failedRewardsLength := len(failedRewardsToClaimChunks) + if failedRewardsLength > 1 { + utils.LavaFormatDebug("Splitting Failed Reward claims into chunks", utils.LogAttr("chunk_size", splitRewardsIntoChunksSize), utils.LogAttr("number_of_chunks", failedRewardsLength)) } - rewardsToClaim, err := rws.gatherRewardsForClaim(ctx, epoch, earliestSavedEpoch) + // Handle new claims + gatheredRewardsToClaim, err := rws.gatherRewardsForClaim(ctx, epoch, earliestSavedEpoch) if err != nil { return err } - - specs := map[string]struct{}{} - for _, relay := range rewardsToClaim { - consumerAddr, err := sigs.ExtractSignerAddress(relay) - if err != nil { - utils.LavaFormatError("invalid consumer address extraction from relay", err, utils.Attribute{Key: "relay", Value: relay}) - continue - } - expectedPay := PaymentRequest{ - ChainID: relay.SpecId, - CU: relay.CuSum, - BlockHeightDeadline: relay.Epoch, - Amount: sdk.Coin{}, - Client: consumerAddr, - UniqueIdentifier: relay.SessionId, - Description: strconv.FormatUint(rws.serverID, 10), - ConsumerRewardsKey: getKeyForConsumerRewards(relay.SpecId, consumerAddr.String()), - } - rws.addExpectedPayment(expectedPay) - rws.updateCUServiced(relay.CuSum) - specs[relay.SpecId] = struct{}{} - } - if len(rewardsToClaim) > 0 { - err = rws.rewardsTxSender.TxRelayPayment(ctx, rewardsToClaim, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs)) - if err != nil { - rws.updatePaymentRequestAttempt(rewardsToClaim, false) - return utils.LavaFormatError("failed sending rewards claim", err) + rewardsToClaimChunks := lavaslices.SplitGenericSliceIntoChunks(gatheredRewardsToClaim, splitRewardsIntoChunksSize) + newRewardsLength := len(rewardsToClaimChunks) + if newRewardsLength > 1 { + utils.LavaFormatDebug("Splitting Reward claims into chunks", utils.LogAttr("chunk_size", splitRewardsIntoChunksSize), utils.LogAttr("number_of_chunks", newRewardsLength)) + } + + // payment chunk configurations + paymentConfiguration := []*PaymentConfiguration{ + { // adding the new rewards. + relaySessionChunks: rewardsToClaimChunks, + shouldAddExpectedPayment: true, + }, + { // adding the failed rewards. + relaySessionChunks: failedRewardsToClaimChunks, + shouldAddExpectedPayment: false, + }, + } + + paymentWaitGroup := sync.WaitGroup{} + paymentWaitGroup.Add(newRewardsLength + failedRewardsLength) + + // add expected pay and ask for rewards + for _, paymentConfig := range paymentConfiguration { + for _, rewardsToClaim := range paymentConfig.relaySessionChunks { + if len(rewardsToClaim) == 0 { + if paymentConfig.shouldAddExpectedPayment { + utils.LavaFormatDebug("no new rewards to claim") + } else { + utils.LavaFormatDebug("no failed rewards to claim") + } + continue + } + go func(rewards []*pairingtypes.RelaySession, payment *PaymentConfiguration) { // send rewards asynchronously + defer paymentWaitGroup.Done() + specs := map[string]struct{}{} + if payment.shouldAddExpectedPayment { + for _, relay := range rewards { + consumerAddr, err := sigs.ExtractSignerAddress(relay) + if err != nil { + utils.LavaFormatError("invalid consumer address extraction from relay", err, utils.Attribute{Key: "relay", Value: relay}) + continue + } + expectedPay := PaymentRequest{ + ChainID: relay.SpecId, + CU: relay.CuSum, + BlockHeightDeadline: relay.Epoch, + Amount: sdk.Coin{}, + Client: consumerAddr, + UniqueIdentifier: relay.SessionId, + Description: strconv.FormatUint(rws.serverID, 10), + ConsumerRewardsKey: getKeyForConsumerRewards(relay.SpecId, consumerAddr.String()), + } + rws.addExpectedPayment(expectedPay) + rws.updateCUServiced(relay.CuSum) + specs[relay.SpecId] = struct{}{} + if debug { + utils.LavaFormatDebug("Adding Payment for Spec", utils.LogAttr("spec", relay.SpecId), utils.LogAttr("Cu Sum", relay.CuSum), utils.LogAttr("epoch", relay.Epoch), utils.LogAttr("consumerAddr", consumerAddr), utils.LogAttr("number_of_relays_served", relay.RelayNum), utils.LogAttr("sessionId", relay.SessionId)) + } + } + } else { // just add the specs + for _, relay := range failedRewardRequestsToRetry { + utils.LavaFormatDebug("[sendRewardsClaim] retrying failed id", utils.LogAttr("id", relay.SessionId)) + specs[relay.SpecId] = struct{}{} + } + } + err = rws.rewardsTxSender.TxRelayPayment(ctx, rewards, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs)) + if err != nil { + rws.updatePaymentRequestAttempt(rewards, false) + utils.LavaFormatError("failed sending rewards claim", err) + return + } + rws.updatePaymentRequestAttempt(rewards, true) + utils.LavaFormatDebug("Sent rewards claim", utils.Attribute{Key: "number_of_relay_sessions_sent", Value: len(rewards)}) + }(rewardsToClaim, paymentConfig) } - rws.updatePaymentRequestAttempt(rewardsToClaim, true) - - utils.LavaFormatDebug("Sent rewards claim", utils.Attribute{Key: "number_of_relay_sessions_sent", Value: len(rewardsToClaim)}) - } else { - utils.LavaFormatDebug("no rewards to claim") } + utils.LavaFormatDebug("Waiting for all Payment groups to finish", utils.LogAttr("wait_group_size", newRewardsLength+failedRewardsLength)) + paymentWaitGroup.Wait() return nil } @@ -444,14 +490,9 @@ func (rws *RewardServer) PaymentHandler(payment *PaymentRequest) { if !removedPayment { utils.LavaFormatWarning("tried removing payment that wasn't expected", nil, utils.Attribute{Key: "payment", Value: payment}) } - - utils.LavaFormatDebug("Reward Server detected successful payment request, deleting claimed rewards", utils.Attribute{Key: "payment-uid", Value: payment.UniqueIdentifier}) - err = rws.rewardDB.DeleteClaimedRewards(payment.PaymentEpoch, payment.Client.String(), payment.UniqueIdentifier, payment.ConsumerRewardsKey) if err != nil { utils.LavaFormatWarning("failed deleting claimed rewards", err) - } else { - utils.LavaFormatDebug("deleted claimed rewards successfully", utils.Attribute{Key: "payment-uid", Value: payment.UniqueIdentifier}) } } } @@ -621,13 +662,11 @@ func (rws *RewardServer) gatherFailedRequestPaymentsToRetry(earliestSavedEpoch u func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingtypes.RelaySession, success bool) { rws.lock.Lock() defer rws.lock.Unlock() - for _, relaySession := range paymentRequests { sessionId := relaySession.SessionId sessionWithAttempts, found := rws.failedRewardsPaymentRequests[sessionId] if !found { if !success { - utils.LavaFormatDebug("Adding new session to failedRewardsPaymentRequests", utils.Attribute{Key: "sessionId", Value: sessionId}) rws.failedRewardsPaymentRequests[sessionId] = &RelaySessionsToRetryAttempts{ relaySession: relaySession, paymentRequestRetryAttempts: 1, @@ -637,7 +676,6 @@ func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingt } if success { - utils.LavaFormatDebug("Removing session from failedRewardsPaymentRequests", utils.Attribute{Key: "sessionId", Value: sessionId}) delete(rws.failedRewardsPaymentRequests, sessionId) continue } @@ -648,7 +686,6 @@ func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingt utils.Attribute{Key: "sessionIds", Value: sessionId}, utils.Attribute{Key: "maxRetriesAllowed", Value: MaxPaymentRequestsRetiresForSession}, ) - delete(rws.failedRewardsPaymentRequests, sessionId) rws.deleteRelaySessionFromRewardDB(relaySession) continue diff --git a/protocol/statetracker/tx_sender.go b/protocol/statetracker/tx_sender.go index be08f0ebd2..16791703bb 100644 --- a/protocol/statetracker/tx_sender.go +++ b/protocol/statetracker/tx_sender.go @@ -198,7 +198,7 @@ func (ts *TxSender) SendTxAndVerifyCommit(txfactory tx.Factory, msg sdk.Msg) (pa utils.LavaFormatDebug("transaction results", utils.Attribute{Key: "jsonParsedResult", Value: jsonParsedResult}) } resultData, err := common.ParseTransactionResult(jsonParsedResult) - utils.LavaFormatDebug("Sent Transaction", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash))) + utils.LavaFormatInfo("Sent Transaction", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash))) if err != nil { return common.TxResultData{}, err } @@ -218,6 +218,8 @@ func (ts *TxSender) waitForTxCommit(resultData common.TxResultData) (common.TxRe timeOutReached := false go func() { for { + // we will never catch the tx hash in the first attempt as not enough time have passed, so we sleep at the beginning of the loop + time.Sleep(5 * time.Second) if timeOutReached { utils.LavaFormatWarning("Timeout waiting for transaction", nil, utils.LogAttr("hash", resultData.Txhash)) return @@ -234,7 +236,6 @@ func (ts *TxSender) waitForTxCommit(resultData common.TxResultData) (common.TxRe if debug { utils.LavaFormatWarning("Tx query got error", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "resultData", Value: resultData}) } - time.Sleep(5 * time.Second) } }() select { diff --git a/protocol/statetracker/updaters/payment_updater.go b/protocol/statetracker/updaters/payment_updater.go index 5d25e78e70..db14a8afec 100644 --- a/protocol/statetracker/updaters/payment_updater.go +++ b/protocol/statetracker/updaters/payment_updater.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/lavanet/lava/protocol/rpcprovider/rewardserver" + "github.com/lavanet/lava/utils" "golang.org/x/net/context" ) @@ -43,12 +44,17 @@ func (pu *PaymentUpdater) updateInner() { if err != nil { return } + relevantPayments := 0 for _, payment := range payments { updatable, foundUpdatable := pu.paymentUpdatable[payment.Description] if foundUpdatable { + relevantPayments += 1 (*updatable).PaymentHandler(payment) } } + if len(payments) > 0 { + utils.LavaFormatDebug("relevant payment events", utils.Attribute{Key: "number_of_payment_events_detected", Value: len(payments)}, utils.Attribute{Key: "number_of_relevant_payments_detected", Value: relevantPayments}) + } } func (pu *PaymentUpdater) Reset(latestBlock int64) { diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index fb1e65cc25..ee1ceb2099 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -45,6 +45,7 @@ wait_next_block lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --delegate-limit 1000ulava --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE sleep_until_next_epoch +wait_next_block screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ @@ -52,6 +53,8 @@ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +wait_next_block + screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ $EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 diff --git a/utils/lavaslices/slices.go b/utils/lavaslices/slices.go index 10dd9b2604..d93c41ee03 100644 --- a/utils/lavaslices/slices.go +++ b/utils/lavaslices/slices.go @@ -1,6 +1,8 @@ package lavaslices import ( + "math" + "golang.org/x/exp/constraints" "golang.org/x/exp/slices" ) @@ -261,3 +263,36 @@ func UnorderedEqual[T comparable](slices ...[]T) bool { return true } + +// splitSliceGeneric splits a slice into smaller slices of at most chunkSize length. +// for example len(arr) == 1400 and chunk size 500 will return [500, 500, 400] +func SplitGenericSliceIntoChunks[T any](arr []T, chunkSize int) [][]T { + var result [][]T + + // Calculate the number of chunks needed + numChunks := int(math.Ceil(float64(len(arr)) / float64(chunkSize))) + + // Iterate over the original slice and slice it into chunks + for i := 0; i < numChunks; i++ { + start := i * chunkSize + end := start + chunkSize + + // Ensure end doesn't exceed the length of the slice + if end > len(arr) { + end = len(arr) + } + + // Create a chunk with preallocated capacity + chunk := make([]T, 0, chunkSize) + + // Append elements to the chunk + for j := start; j < end; j++ { + chunk = append(chunk, arr[j]) + } + + // Append the chunk to the result slice + result = append(result, chunk) + } + + return result +} diff --git a/utils/lavaslices/slices_test.go b/utils/lavaslices/slices_test.go index 9d32c4aac4..5101457b08 100644 --- a/utils/lavaslices/slices_test.go +++ b/utils/lavaslices/slices_test.go @@ -419,3 +419,25 @@ func TestFilter(t *testing.T) { require.Equal(t, []int{}, Filter([]int{1}, filter)) require.Equal(t, []int{2, 4}, Filter([]int{1, 2, 3, 4}, filter)) } + +func TestSliceSplitter(t *testing.T) { + // Sample usage + originalSliceSize := 1400 + testSlice := make([]int, originalSliceSize) // Assuming this is your original array + for i := 0; i < originalSliceSize; i++ { + testSlice[i] = i + } + testSizes := []int{500, 333, 200, 20, 1} + for _, i := range testSizes { + originalSizeCopy := originalSliceSize + retSlice := SplitGenericSliceIntoChunks(testSlice, i) + for _, k := range retSlice { + if originalSizeCopy < i { + require.Len(t, k, originalSizeCopy) + } else { + require.Len(t, k, i) + } + originalSizeCopy -= i + } + } +}