Skip to content

Commit

Permalink
using the new pairingQuerier.Provider api to test if a provider is st…
Browse files Browse the repository at this point in the history
…aked
  • Loading branch information
mikecot committed Oct 9, 2024
1 parent 20b8552 commit 6f64d79
Showing 1 changed file with 92 additions and 33 deletions.
125 changes: 92 additions & 33 deletions protocol/monitoring/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package monitoring

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/goccy/go-json"

"github.com/cosmos/cosmos-sdk/client"
"github.com/gogo/status"
lvutil "github.com/lavanet/lava/v2/ecosystem/lavavisor/pkg/util"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/lavanet/lava/v2/protocol/rpcprovider"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/rand"
dualstakingtypes "github.com/lavanet/lava/v2/x/dualstaking/types"
epochstoragetypes "github.com/lavanet/lava/v2/x/epochstorage/types"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
protocoltypes "github.com/lavanet/lava/v2/x/protocol/types"
Expand Down Expand Up @@ -149,9 +147,12 @@ func RunHealth(ctx context.Context,

errCh := make(chan error, 1)

stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{}

// get a list of all necessary specs for the test
dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx)
// dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx)
if getAllProviders {
// fmt.Println("line 155 getAllProviders")
// var specResp *spectypes.QueryGetSpecResponse
var specsResp *spectypes.QueryShowAllChainsResponse
for i := 0; i < BasicQueryRetries; i++ {
Expand All @@ -173,8 +174,13 @@ func RunHealth(ctx context.Context,
healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID})
}
} else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 {
// fmt.Println("line 176 singleProviderSpecsInterfacesData", singleProviderSpecsInterfacesData)
// fmt.Println("line 176 providerAddresses", providerAddresses)
for _, providerAddress := range providerAddresses {
// fmt.Println("line 180 providerAddress", providerAddress)
for spec, apiInterfaces := range singleProviderSpecsInterfacesData {
// fmt.Println("line 182 spec", spec)
// fmt.Println("line 182 apiInterfaces", apiInterfaces)
healthResults.setSpec(&spectypes.Spec{Index: spec})
for _, apiInterface := range apiInterfaces {
healthResults.SetProviderData(LavaEntity{
Expand All @@ -185,47 +191,88 @@ func RunHealth(ctx context.Context,
}
}
}

} else {
// fmt.Println("line 195 else")
var wgproviders sync.WaitGroup
wgproviders.Add(len(providerAddresses))
processProvider := func(providerAddress string) {
defer wgproviders.Done()
var err error
// var err error
for i := 0; i < BasicQueryRetries; i++ {
var response *dualstakingtypes.QueryDelegatorProvidersResponse
// var response *dualstakingtypes.QueryDelegatorProvidersResponse
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{
Delegator: providerAddress,
WithPending: false,
// response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{
// Delegator: providerAddress,
// WithPending: false,
// })
// fmt.Println("!!line 208 response", response)

// responseJSON1, err := json.MarshalIndent(response, "", " ")
// if err != nil {
// fmt.Println("!!Error marshaling response to JSON:", err)
// } else {
// fmt.Println("!!line 214 response (JSON):")
// fmt.Println(string(responseJSON1))
// }

pairingQuerier := pairingtypes.NewQueryClient(clientCtx)

response, err := pairingQuerier.Provider(queryCtx, &pairingtypes.QueryProviderRequest{
Address: providerAddress,
// ShowFrozen: true, ?
})

cancel()
if err != nil || response == nil {
time.Sleep(QuerySleepTime)
continue
}
// Print response2 in JSON format
// responseJSON, err := json.MarshalIndent(response2, "", " ")
// if err != nil {
// fmt.Println("!!Error marshaling response2 to JSON:", err)
// } else {
// fmt.Println("!!line 214 response2 (JSON):")
// fmt.Println(string(responseJSON))
// }

delegations := response.GetDelegations()
for _, delegation := range delegations {
if delegation.Provider == providerAddress {
healthResults.setSpec(&spectypes.Spec{Index: delegation.ChainID})
for _, apiInterface := range chainIdToApiInterfaces[delegation.ChainID] {
healthResults.SetProviderData(LavaEntity{
Address: providerAddress,
SpecId: delegation.ChainID,
ApiInterface: apiInterface,
}, ReplyData{})
// fmt.Println("line 214 err2", err2)

cancel()
if err != nil {
fmt.Println("Error querying provider:", err)
} else if response != nil && len(response.StakeEntries) > 0 {
for _, stakeEntry := range response.StakeEntries {

// Set the spec
healthResults.setSpec(&spectypes.Spec{Index: stakeEntry.Chain})

// Process endpoints and API interfaces
for _, endpoint := range stakeEntry.Endpoints {
for _, apiInterface := range endpoint.ApiInterfaces {
providerWithSpecAndAPI := LavaEntity{
Address: stakeEntry.Address,
SpecId: stakeEntry.Chain,
ApiInterface: apiInterface,
}
healthResults.SetProviderData(providerWithSpecAndAPI, ReplyData{})

if stakeEntry.StakeAppliedBlock > uint64(currentBlock) {
healthResults.FreezeProvider(providerWithSpecAndAPI)
// fmt.Printf("Froze provider %+v\n", providerWithSpecAndAPI)
} else {
stakeEntries[providerWithSpecAndAPI] = stakeEntry
// fmt.Printf("Added stake entry for %+v\n", providerWithSpecAndAPI)
}
}
}
}
}
return
}
if err != nil {
select {
case errCh <- err:
default:
} else {
fmt.Println("No provider data received")
}
}
// if err != nil {
// select {
// case errCh <- err:
// default:
// }
// }
}

for _, providerAddress := range providerAddresses {
Expand Down Expand Up @@ -274,7 +321,9 @@ func RunHealth(ctx context.Context,
wgspecs.Add(len(specs))
// populate the specs
utils.LavaFormatDebug("[+] populating specs")
// fmt.Println("line 1111 specs", specs)
for specId := range specs {
// fmt.Println("line 1112 specId", specId)
go processSpec(specId)
}

Expand All @@ -286,7 +335,6 @@ func RunHealth(ctx context.Context,

utils.LavaFormatDebug("[+] Starting to get provider entries")

stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{}
var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries
if lookupSpecsFromArg == nil {
wgspecs.Add(len(healthResults.getSpecs()))
Expand All @@ -306,6 +354,9 @@ func RunHealth(ctx context.Context,
ChainID: specId,
ShowFrozen: true,
})

fmt.Println("response", response)

cancel()
if err != nil || response == nil {
utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response))
Expand Down Expand Up @@ -372,17 +423,23 @@ func RunHealth(ctx context.Context,
}
// get provider stake entries for each spec or only for the ones given as arguments
if lookupSpecsFromArg != nil {
fmt.Println("lookupSpecsFromArg", lookupSpecsFromArg)
fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
for specId := range healthResults.getSpecs() {
fmt.Println("specId", specId)
utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed
for _, arg := range lookupSpecsFromArg {
fmt.Println("arg", arg)
if arg == strings.ToUpper(specId) {
fmt.Println("Match found for specId", specId) // Print when a match is found
utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found
processSpecProviders(specId)
break
}
}
}
} else {
fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
}
Expand All @@ -395,6 +452,8 @@ func RunHealth(ctx context.Context,
// check for mismtaches in the pairings query and the arguments
// This flow can be triggered with the following command:
// lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510
// lavap test health health_all_providers.yml --node https://testnet2-rpc.lavapro.xyz:443/ --single-provider-address lava@1ggcyk4wrlluh42dmak4s2c489ldkx8zaahde84 --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"FVM": ["jsonrpc"] }' --log_level debug --post-results-address http://localhost:6510

if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 {
for _, address := range providerAddresses {
for specId, apiInterfaces := range singleProviderSpecsInterfacesData {
Expand All @@ -405,7 +464,7 @@ func RunHealth(ctx context.Context,
ApiInterface: apiInterface,
}
if _, ok := stakeEntries[lookupKey]; !ok {
healthResults.SetUnhealthyProvider(lookupKey, "no pairings found")
healthResults.SetUnhealthyProvider(lookupKey, "No stake entry found for provider-spec-api pair")
}
}
}
Expand Down

0 comments on commit 6f64d79

Please sign in to comment.