diff --git a/protocol/monitoring/health.go b/protocol/monitoring/health.go index a928bf31bb..9b3d549367 100644 --- a/protocol/monitoring/health.go +++ b/protocol/monitoring/health.go @@ -2,13 +2,12 @@ package monitoring import ( "context" + "encoding/json" "fmt" "strings" "sync" "time" - "github.com/goccy/go-json" - "github.com/cosmos/cosmos-sdk/client" "github.com/gogo/status" lvutil "github.com/lavanet/lava/v2/ecosystem/lavavisor/pkg/util" @@ -19,7 +18,6 @@ import ( "github.com/lavanet/lava/v2/protocol/rpcprovider" "github.com/lavanet/lava/v2/utils" "github.com/lavanet/lava/v2/utils/rand" - dualstakingtypes "github.com/lavanet/lava/v2/x/dualstaking/types" epochstoragetypes "github.com/lavanet/lava/v2/x/epochstorage/types" pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" protocoltypes "github.com/lavanet/lava/v2/x/protocol/types" @@ -149,9 +147,12 @@ func RunHealth(ctx context.Context, errCh := make(chan error, 1) + stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{} + // get a list of all necessary specs for the test - dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx) + // dualStakingQuerier := dualstakingtypes.NewQueryClient(clientCtx) if getAllProviders { + // fmt.Println("line 155 getAllProviders") // var specResp *spectypes.QueryGetSpecResponse var specsResp *spectypes.QueryShowAllChainsResponse for i := 0; i < BasicQueryRetries; i++ { @@ -173,8 +174,13 @@ func RunHealth(ctx context.Context, healthResults.setSpec(&spectypes.Spec{Index: specInfo.ChainID}) } } else if len(singleProviderSpecsInterfacesData) > 0 && len(providerAddresses) > 1 { + // fmt.Println("line 176 singleProviderSpecsInterfacesData", singleProviderSpecsInterfacesData) + // fmt.Println("line 176 providerAddresses", providerAddresses) for _, providerAddress := range providerAddresses { + // fmt.Println("line 180 providerAddress", providerAddress) for spec, apiInterfaces := range singleProviderSpecsInterfacesData { + // fmt.Println("line 182 spec", spec) + // fmt.Println("line 182 apiInterfaces", apiInterfaces) healthResults.setSpec(&spectypes.Spec{Index: spec}) for _, apiInterface := range apiInterfaces { healthResults.SetProviderData(LavaEntity{ @@ -185,47 +191,88 @@ func RunHealth(ctx context.Context, } } } + } else { + // fmt.Println("line 195 else") var wgproviders sync.WaitGroup wgproviders.Add(len(providerAddresses)) processProvider := func(providerAddress string) { defer wgproviders.Done() - var err error + // var err error for i := 0; i < BasicQueryRetries; i++ { - var response *dualstakingtypes.QueryDelegatorProvidersResponse + // var response *dualstakingtypes.QueryDelegatorProvidersResponse queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{ - Delegator: providerAddress, - WithPending: false, + // response, err = dualStakingQuerier.DelegatorProviders(queryCtx, &dualstakingtypes.QueryDelegatorProvidersRequest{ + // Delegator: providerAddress, + // WithPending: false, + // }) + // fmt.Println("!!line 208 response", response) + + // responseJSON1, err := json.MarshalIndent(response, "", " ") + // if err != nil { + // fmt.Println("!!Error marshaling response to JSON:", err) + // } else { + // fmt.Println("!!line 214 response (JSON):") + // fmt.Println(string(responseJSON1)) + // } + + pairingQuerier := pairingtypes.NewQueryClient(clientCtx) + + response, err := pairingQuerier.Provider(queryCtx, &pairingtypes.QueryProviderRequest{ + Address: providerAddress, + // ShowFrozen: true, ? }) - cancel() - if err != nil || response == nil { - time.Sleep(QuerySleepTime) - continue - } + // Print response2 in JSON format + // responseJSON, err := json.MarshalIndent(response2, "", " ") + // if err != nil { + // fmt.Println("!!Error marshaling response2 to JSON:", err) + // } else { + // fmt.Println("!!line 214 response2 (JSON):") + // fmt.Println(string(responseJSON)) + // } - delegations := response.GetDelegations() - for _, delegation := range delegations { - if delegation.Provider == providerAddress { - healthResults.setSpec(&spectypes.Spec{Index: delegation.ChainID}) - for _, apiInterface := range chainIdToApiInterfaces[delegation.ChainID] { - healthResults.SetProviderData(LavaEntity{ - Address: providerAddress, - SpecId: delegation.ChainID, - ApiInterface: apiInterface, - }, ReplyData{}) + // fmt.Println("line 214 err2", err2) + + cancel() + if err != nil { + fmt.Println("Error querying provider:", err) + } else if response != nil && len(response.StakeEntries) > 0 { + for _, stakeEntry := range response.StakeEntries { + + // Set the spec + healthResults.setSpec(&spectypes.Spec{Index: stakeEntry.Chain}) + + // Process endpoints and API interfaces + for _, endpoint := range stakeEntry.Endpoints { + for _, apiInterface := range endpoint.ApiInterfaces { + providerWithSpecAndAPI := LavaEntity{ + Address: stakeEntry.Address, + SpecId: stakeEntry.Chain, + ApiInterface: apiInterface, + } + healthResults.SetProviderData(providerWithSpecAndAPI, ReplyData{}) + + if stakeEntry.StakeAppliedBlock > uint64(currentBlock) { + healthResults.FreezeProvider(providerWithSpecAndAPI) + // fmt.Printf("Froze provider %+v\n", providerWithSpecAndAPI) + } else { + stakeEntries[providerWithSpecAndAPI] = stakeEntry + // fmt.Printf("Added stake entry for %+v\n", providerWithSpecAndAPI) + } + } } } - } - return - } - if err != nil { - select { - case errCh <- err: - default: + } else { + fmt.Println("No provider data received") } } + // if err != nil { + // select { + // case errCh <- err: + // default: + // } + // } } for _, providerAddress := range providerAddresses { @@ -274,7 +321,9 @@ func RunHealth(ctx context.Context, wgspecs.Add(len(specs)) // populate the specs utils.LavaFormatDebug("[+] populating specs") + // fmt.Println("line 1111 specs", specs) for specId := range specs { + // fmt.Println("line 1112 specId", specId) go processSpec(specId) } @@ -286,7 +335,6 @@ func RunHealth(ctx context.Context, utils.LavaFormatDebug("[+] Starting to get provider entries") - stakeEntries := map[LavaEntity]epochstoragetypes.StakeEntry{} var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries if lookupSpecsFromArg == nil { wgspecs.Add(len(healthResults.getSpecs())) @@ -306,6 +354,9 @@ func RunHealth(ctx context.Context, ChainID: specId, ShowFrozen: true, }) + + fmt.Println("response", response) + cancel() if err != nil || response == nil { utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response)) @@ -372,10 +423,15 @@ func RunHealth(ctx context.Context, } // get provider stake entries for each spec or only for the ones given as arguments if lookupSpecsFromArg != nil { + fmt.Println("lookupSpecsFromArg", lookupSpecsFromArg) + fmt.Println("healthResults.getSpecs()", healthResults.getSpecs()) for specId := range healthResults.getSpecs() { + fmt.Println("specId", specId) utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed for _, arg := range lookupSpecsFromArg { + fmt.Println("arg", arg) if arg == strings.ToUpper(specId) { + fmt.Println("Match found for specId", specId) // Print when a match is found utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found processSpecProviders(specId) break @@ -383,6 +439,7 @@ func RunHealth(ctx context.Context, } } } else { + fmt.Println("healthResults.getSpecs()", healthResults.getSpecs()) for specId := range healthResults.getSpecs() { go processSpecProviders(specId) } @@ -395,6 +452,8 @@ func RunHealth(ctx context.Context, // check for mismtaches in the pairings query and the arguments // This flow can be triggered with the following command: // lavap test health health_all_providers.yml --node https://public-rpc.lavanet.xyz:443 --single-provider-address lava@1czgrha7ys2698xve2gz4xaccteplrzx8s9fh7e --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"ARB1": ["grpc"] }' --log_level debug --post-results-address http://localhost:6510 + // lavap test health health_all_providers.yml --node https://testnet2-rpc.lavapro.xyz:443/ --single-provider-address lava@1ggcyk4wrlluh42dmak4s2c489ldkx8zaahde84 --post-results-guid 6IJN3OroilsAB030rXIeh3PeJbRpp5Wy --run-once-and-exit --post-results-skip-spec --single-provider-specs-interfaces-data '{"FVM": ["jsonrpc"] }' --log_level debug --post-results-address http://localhost:6510 + if len(providerAddresses) > 0 && len(singleProviderSpecsInterfacesData) > 0 { for _, address := range providerAddresses { for specId, apiInterfaces := range singleProviderSpecsInterfacesData { @@ -405,7 +464,7 @@ func RunHealth(ctx context.Context, ApiInterface: apiInterface, } if _, ok := stakeEntries[lookupKey]; !ok { - healthResults.SetUnhealthyProvider(lookupKey, "no pairings found") + healthResults.SetUnhealthyProvider(lookupKey, "No stake entry found for provider-spec-api pair") } } }