Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT - Split big payments #1327

Merged
merged 5 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 93 additions & 56 deletions protocol/rpcprovider/rewardserver/reward_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lavanet/lava/protocol/lavasession"
"github.com/lavanet/lava/protocol/metrics"
"github.com/lavanet/lava/utils"
"github.com/lavanet/lava/utils/lavaslices"
"github.com/lavanet/lava/utils/rand"
"github.com/lavanet/lava/utils/sigs"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
Expand All @@ -32,6 +33,8 @@ const (
DefaultRewardsSnapshotTimeoutSec = 30
MaxPaymentRequestsRetiresForSession = 3
RewardServerMaxRelayRetires = 3
splitRewardsIntoChunksSize = 500 // if the reward array is larger than this it will split it into chunks and send multiple requests instead of a huge one
debug = false
)

type PaymentRequest struct {
Expand All @@ -58,8 +61,8 @@ type ConsumerRewards struct {
}

func (csrw *ConsumerRewards) PrepareRewardsForClaim() (retProofs []*pairingtypes.RelaySession, errRet error) {
utils.LavaFormatDebug("Adding reward ids for claim", utils.LogAttr("number_of_proofs", len(csrw.proofs)))
for _, proof := range csrw.proofs {
utils.LavaFormatDebug("Adding reward id for claim", utils.LogAttr("Id", proof.SessionId))
retProofs = append(retProofs, proof)
}
return
Expand All @@ -75,6 +78,11 @@ type RelaySessionsToRetryAttempts struct {
paymentRequestRetryAttempts uint64
}

type PaymentConfiguration struct {
relaySessionChunks [][]*pairingtypes.RelaySession // small chunks of relay session to request payments for
shouldAddExpectedPayment bool
}

type RewardServer struct {
rewardsTxSender RewardsTxSender
lock sync.RWMutex
Expand Down Expand Up @@ -223,62 +231,100 @@ func (rws *RewardServer) sendRewardsClaim(ctx context.Context, epoch uint64) err
return utils.LavaFormatError("sendRewardsClaim failed to get earliest block in memory", err)
}

// Handle Failed rewards claim with retry
failedRewardRequestsToRetry := rws.gatherFailedRequestPaymentsToRetry(earliestSavedEpoch)
if len(failedRewardRequestsToRetry) > 0 {
utils.LavaFormatDebug("Found failed reward claims, retrying", utils.LogAttr("number_of_rewards", len((failedRewardRequestsToRetry))))
specs := map[string]struct{}{}
for _, relay := range failedRewardRequestsToRetry {
utils.LavaFormatDebug("[sendRewardsClaim] retrying failed id", utils.LogAttr("id", relay.SessionId))
specs[relay.SpecId] = struct{}{}
}

err = rws.rewardsTxSender.TxRelayPayment(ctx, failedRewardRequestsToRetry, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs))
if err != nil {
rws.updatePaymentRequestAttempt(failedRewardRequestsToRetry, false)
utils.LavaFormatError("failed sending previously failed payment requests", err)
} else {
rws.updatePaymentRequestAttempt(failedRewardRequestsToRetry, true)
}
}
failedRewardsToClaimChunks := lavaslices.SplitGenericSliceIntoChunks(failedRewardRequestsToRetry, splitRewardsIntoChunksSize)
failedRewardsLength := len(failedRewardsToClaimChunks)
if failedRewardsLength > 1 {
utils.LavaFormatDebug("Splitting Failed Reward claims into chunks", utils.LogAttr("chunk_size", splitRewardsIntoChunksSize), utils.LogAttr("number_of_chunks", failedRewardsLength))
}

rewardsToClaim, err := rws.gatherRewardsForClaim(ctx, epoch, earliestSavedEpoch)
// Handle new claims
gatheredRewardsToClaim, err := rws.gatherRewardsForClaim(ctx, epoch, earliestSavedEpoch)
if err != nil {
return err
}

specs := map[string]struct{}{}
for _, relay := range rewardsToClaim {
consumerAddr, err := sigs.ExtractSignerAddress(relay)
if err != nil {
utils.LavaFormatError("invalid consumer address extraction from relay", err, utils.Attribute{Key: "relay", Value: relay})
continue
}
expectedPay := PaymentRequest{
ChainID: relay.SpecId,
CU: relay.CuSum,
BlockHeightDeadline: relay.Epoch,
Amount: sdk.Coin{},
Client: consumerAddr,
UniqueIdentifier: relay.SessionId,
Description: strconv.FormatUint(rws.serverID, 10),
ConsumerRewardsKey: getKeyForConsumerRewards(relay.SpecId, consumerAddr.String()),
}
rws.addExpectedPayment(expectedPay)
rws.updateCUServiced(relay.CuSum)
specs[relay.SpecId] = struct{}{}
}
if len(rewardsToClaim) > 0 {
err = rws.rewardsTxSender.TxRelayPayment(ctx, rewardsToClaim, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs))
if err != nil {
rws.updatePaymentRequestAttempt(rewardsToClaim, false)
return utils.LavaFormatError("failed sending rewards claim", err)
rewardsToClaimChunks := lavaslices.SplitGenericSliceIntoChunks(gatheredRewardsToClaim, splitRewardsIntoChunksSize)
newRewardsLength := len(rewardsToClaimChunks)
if newRewardsLength > 1 {
utils.LavaFormatDebug("Splitting Reward claims into chunks", utils.LogAttr("chunk_size", splitRewardsIntoChunksSize), utils.LogAttr("number_of_chunks", newRewardsLength))
}

// payment chunk configurations
paymentConfiguration := []*PaymentConfiguration{
{ // adding the new rewards.
relaySessionChunks: rewardsToClaimChunks,
shouldAddExpectedPayment: true,
},
{ // adding the failed rewards.
relaySessionChunks: failedRewardsToClaimChunks,
shouldAddExpectedPayment: false,
},
}

paymentWaitGroup := sync.WaitGroup{}
paymentWaitGroup.Add(newRewardsLength + failedRewardsLength)

// add expected pay and ask for rewards
for _, paymentConfig := range paymentConfiguration {
for _, rewardsToClaim := range paymentConfig.relaySessionChunks {
if len(rewardsToClaim) == 0 {
if paymentConfig.shouldAddExpectedPayment {
utils.LavaFormatDebug("no new rewards to claim")
} else {
utils.LavaFormatDebug("no failed rewards to claim")
}
continue
}
go func(rewards []*pairingtypes.RelaySession, payment *PaymentConfiguration) { // send rewards asynchronously
defer paymentWaitGroup.Done()
specs := map[string]struct{}{}
if payment.shouldAddExpectedPayment {
for _, relay := range rewards {
consumerAddr, err := sigs.ExtractSignerAddress(relay)
if err != nil {
utils.LavaFormatError("invalid consumer address extraction from relay", err, utils.Attribute{Key: "relay", Value: relay})
continue
}
expectedPay := PaymentRequest{
ChainID: relay.SpecId,
CU: relay.CuSum,
BlockHeightDeadline: relay.Epoch,
Amount: sdk.Coin{},
Client: consumerAddr,
UniqueIdentifier: relay.SessionId,
Description: strconv.FormatUint(rws.serverID, 10),
ConsumerRewardsKey: getKeyForConsumerRewards(relay.SpecId, consumerAddr.String()),
}
rws.addExpectedPayment(expectedPay)
rws.updateCUServiced(relay.CuSum)
specs[relay.SpecId] = struct{}{}
if debug {
utils.LavaFormatDebug("Adding Payment for Spec", utils.LogAttr("spec", relay.SpecId), utils.LogAttr("Cu Sum", relay.CuSum), utils.LogAttr("epoch", relay.Epoch), utils.LogAttr("consumerAddr", consumerAddr), utils.LogAttr("number_of_relays_served", relay.RelayNum), utils.LogAttr("sessionId", relay.SessionId))
}
}
} else { // just add the specs
for _, relay := range failedRewardRequestsToRetry {
utils.LavaFormatDebug("[sendRewardsClaim] retrying failed id", utils.LogAttr("id", relay.SessionId))
specs[relay.SpecId] = struct{}{}
}
}
err = rws.rewardsTxSender.TxRelayPayment(ctx, rewards, strconv.FormatUint(rws.serverID, 10), rws.latestBlockReports(specs))
if err != nil {
rws.updatePaymentRequestAttempt(rewards, false)
utils.LavaFormatError("failed sending rewards claim", err)
return
}
rws.updatePaymentRequestAttempt(rewards, true)
utils.LavaFormatDebug("Sent rewards claim", utils.Attribute{Key: "number_of_relay_sessions_sent", Value: len(rewards)})
}(rewardsToClaim, paymentConfig)
}
rws.updatePaymentRequestAttempt(rewardsToClaim, true)

utils.LavaFormatDebug("Sent rewards claim", utils.Attribute{Key: "number_of_relay_sessions_sent", Value: len(rewardsToClaim)})
} else {
utils.LavaFormatDebug("no rewards to claim")
}
utils.LavaFormatDebug("Waiting for all Payment groups to finish", utils.LogAttr("wait_group_size", newRewardsLength+failedRewardsLength))
paymentWaitGroup.Wait()
return nil
}

Expand Down Expand Up @@ -444,14 +490,9 @@ func (rws *RewardServer) PaymentHandler(payment *PaymentRequest) {
if !removedPayment {
utils.LavaFormatWarning("tried removing payment that wasn't expected", nil, utils.Attribute{Key: "payment", Value: payment})
}

utils.LavaFormatDebug("Reward Server detected successful payment request, deleting claimed rewards", utils.Attribute{Key: "payment-uid", Value: payment.UniqueIdentifier})

err = rws.rewardDB.DeleteClaimedRewards(payment.PaymentEpoch, payment.Client.String(), payment.UniqueIdentifier, payment.ConsumerRewardsKey)
if err != nil {
utils.LavaFormatWarning("failed deleting claimed rewards", err)
} else {
utils.LavaFormatDebug("deleted claimed rewards successfully", utils.Attribute{Key: "payment-uid", Value: payment.UniqueIdentifier})
}
}
}
Expand Down Expand Up @@ -621,13 +662,11 @@ func (rws *RewardServer) gatherFailedRequestPaymentsToRetry(earliestSavedEpoch u
func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingtypes.RelaySession, success bool) {
rws.lock.Lock()
defer rws.lock.Unlock()

for _, relaySession := range paymentRequests {
sessionId := relaySession.SessionId
sessionWithAttempts, found := rws.failedRewardsPaymentRequests[sessionId]
if !found {
if !success {
utils.LavaFormatDebug("Adding new session to failedRewardsPaymentRequests", utils.Attribute{Key: "sessionId", Value: sessionId})
rws.failedRewardsPaymentRequests[sessionId] = &RelaySessionsToRetryAttempts{
relaySession: relaySession,
paymentRequestRetryAttempts: 1,
Expand All @@ -637,7 +676,6 @@ func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingt
}

if success {
utils.LavaFormatDebug("Removing session from failedRewardsPaymentRequests", utils.Attribute{Key: "sessionId", Value: sessionId})
delete(rws.failedRewardsPaymentRequests, sessionId)
continue
}
Expand All @@ -648,7 +686,6 @@ func (rws *RewardServer) updatePaymentRequestAttempt(paymentRequests []*pairingt
utils.Attribute{Key: "sessionIds", Value: sessionId},
utils.Attribute{Key: "maxRetriesAllowed", Value: MaxPaymentRequestsRetiresForSession},
)

delete(rws.failedRewardsPaymentRequests, sessionId)
rws.deleteRelaySessionFromRewardDB(relaySession)
continue
Expand Down
5 changes: 3 additions & 2 deletions protocol/statetracker/tx_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (ts *TxSender) SendTxAndVerifyCommit(txfactory tx.Factory, msg sdk.Msg) (pa
utils.LavaFormatDebug("transaction results", utils.Attribute{Key: "jsonParsedResult", Value: jsonParsedResult})
}
resultData, err := common.ParseTransactionResult(jsonParsedResult)
utils.LavaFormatDebug("Sent Transaction", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash)))
utils.LavaFormatInfo("Sent Transaction", utils.LogAttr("Hash", hex.EncodeToString(resultData.Txhash)))
if err != nil {
return common.TxResultData{}, err
}
Expand All @@ -218,6 +218,8 @@ func (ts *TxSender) waitForTxCommit(resultData common.TxResultData) (common.TxRe
timeOutReached := false
go func() {
for {
// we will never catch the tx hash in the first attempt as not enough time have passed, so we sleep at the beginning of the loop
time.Sleep(5 * time.Second)
if timeOutReached {
utils.LavaFormatWarning("Timeout waiting for transaction", nil, utils.LogAttr("hash", resultData.Txhash))
return
Expand All @@ -234,7 +236,6 @@ func (ts *TxSender) waitForTxCommit(resultData common.TxResultData) (common.TxRe
if debug {
utils.LavaFormatWarning("Tx query got error", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "resultData", Value: resultData})
}
time.Sleep(5 * time.Second)
}
}()
select {
Expand Down
6 changes: 6 additions & 0 deletions protocol/statetracker/updaters/payment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"github.com/lavanet/lava/protocol/rpcprovider/rewardserver"
"github.com/lavanet/lava/utils"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -43,12 +44,17 @@ func (pu *PaymentUpdater) updateInner() {
if err != nil {
return
}
relevantPayments := 0
for _, payment := range payments {
updatable, foundUpdatable := pu.paymentUpdatable[payment.Description]
if foundUpdatable {
relevantPayments += 1
(*updatable).PaymentHandler(payment)
}
}
if len(payments) > 0 {
utils.LavaFormatDebug("relevant payment events", utils.Attribute{Key: "number_of_payment_events_detected", Value: len(payments)}, utils.Attribute{Key: "number_of_relevant_payments_detected", Value: relevantPayments})
}
}

func (pu *PaymentUpdater) Reset(latestBlock int64) {
Expand Down
3 changes: 3 additions & 0 deletions scripts/pre_setups/init_lava_only_with_node.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ wait_next_block
lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --delegate-limit 1000ulava --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE

sleep_until_next_epoch
wait_next_block

screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25

wait_next_block

screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \
127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \
$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25
Expand Down
35 changes: 35 additions & 0 deletions utils/lavaslices/slices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lavaslices

import (
"math"

"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -261,3 +263,36 @@ func UnorderedEqual[T comparable](slices ...[]T) bool {

return true
}

// splitSliceGeneric splits a slice into smaller slices of at most chunkSize length.
// for example len(arr) == 1400 and chunk size 500 will return [500, 500, 400]
func SplitGenericSliceIntoChunks[T any](arr []T, chunkSize int) [][]T {
var result [][]T

// Calculate the number of chunks needed
numChunks := int(math.Ceil(float64(len(arr)) / float64(chunkSize)))

// Iterate over the original slice and slice it into chunks
for i := 0; i < numChunks; i++ {
start := i * chunkSize
end := start + chunkSize

// Ensure end doesn't exceed the length of the slice
if end > len(arr) {
end = len(arr)
}

// Create a chunk with preallocated capacity
chunk := make([]T, 0, chunkSize)

// Append elements to the chunk
for j := start; j < end; j++ {
chunk = append(chunk, arr[j])
}

// Append the chunk to the result slice
result = append(result, chunk)
}

return result
}
22 changes: 22 additions & 0 deletions utils/lavaslices/slices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,25 @@ func TestFilter(t *testing.T) {
require.Equal(t, []int{}, Filter([]int{1}, filter))
require.Equal(t, []int{2, 4}, Filter([]int{1, 2, 3, 4}, filter))
}

func TestSliceSplitter(t *testing.T) {
// Sample usage
originalSliceSize := 1400
testSlice := make([]int, originalSliceSize) // Assuming this is your original array
for i := 0; i < originalSliceSize; i++ {
testSlice[i] = i
}
testSizes := []int{500, 333, 200, 20, 1}
for _, i := range testSizes {
originalSizeCopy := originalSliceSize
retSlice := SplitGenericSliceIntoChunks(testSlice, i)
for _, k := range retSlice {
if originalSizeCopy < i {
require.Len(t, k, originalSizeCopy)
} else {
require.Len(t, k, i)
}
originalSizeCopy -= i
}
}
}
Loading