Skip to content

Commit

Permalink
breaking changes - but faster health probes
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecot committed Oct 9, 2024
1 parent 6f64d79 commit 4c89998
Showing 1 changed file with 107 additions and 105 deletions.
212 changes: 107 additions & 105 deletions protocol/monitoring/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func RunHealth(ctx context.Context,
}
healthResults.SetProviderData(providerWithSpecAndAPI, ReplyData{})

if stakeEntry.StakeAppliedBlock > uint64(currentBlock) {
// if stakeEntry.StakeAppliedBlock > uint64(currentBlock) {
if stakeEntry.StakeAppliedBlock > uint64(currentBlock) || stakeEntry.IsFrozen() {
healthResults.FreezeProvider(providerWithSpecAndAPI)
// fmt.Printf("Froze provider %+v\n", providerWithSpecAndAPI)
} else {
Expand Down Expand Up @@ -331,119 +332,120 @@ func RunHealth(ctx context.Context,
if len(errCh) > 0 {
return nil, utils.LavaFormatWarning("[-] populating specs", <-errCh)
}
pairingQuerier := pairingtypes.NewQueryClient(clientCtx)
// pairingQuerier := pairingtypes.NewQueryClient(clientCtx)

utils.LavaFormatDebug("[+] Starting to get provider entries")

var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries
// var mutex sync.Mutex // Mutex to protect concurrent access to stakeEntries
if lookupSpecsFromArg == nil {
wgspecs.Add(len(healthResults.getSpecs()))
}

processSpecProviders := func(specId string) {
if lookupSpecsFromArg == nil {
defer wgspecs.Done()
}

var err error
for i := 0; i < BasicQueryRetries; i++ {
utils.LavaFormatDebug("[+] Attempting to query providers", utils.LogAttr("attempt", i+1), utils.LogAttr("specId", specId))
var response *pairingtypes.QueryProvidersResponse
queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
response, err = pairingQuerier.Providers(queryCtx, &pairingtypes.QueryProvidersRequest{
ChainID: specId,
ShowFrozen: true,
})

fmt.Println("response", response)

cancel()
if err != nil || response == nil {
utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response))
time.Sleep(QuerySleepTime)
continue
}

for _, providerEntry := range response.StakeEntry {
if len(providerAddresses) > 0 {
found := false
for _, address := range providerAddresses {
if address == providerEntry.Address {
found = true
break
}
}
if !found {
continue
}
}
// processSpecProviders := func(specId string) {
// if lookupSpecsFromArg == nil {
// defer wgspecs.Done()
// }

// var err error
// for i := 0; i < BasicQueryRetries; i++ {
// utils.LavaFormatDebug("[+] Attempting to query providers", utils.LogAttr("attempt", i+1), utils.LogAttr("specId", specId))
// var response *pairingtypes.QueryProvidersResponse
// queryCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
// response, err = pairingQuerier.Providers(queryCtx, &pairingtypes.QueryProvidersRequest{
// ChainID: specId,
// ShowFrozen: true,
// })

// fmt.Println("response", response)

// cancel()
// if err != nil || response == nil {
// utils.LavaFormatDebug("[!] Query failed or no response", utils.LogAttr("error", err), utils.LogAttr("response", response))
// time.Sleep(QuerySleepTime)
// continue
// }

// for _, providerEntry := range response.StakeEntry {
// if len(providerAddresses) > 0 {
// found := false
// for _, address := range providerAddresses {
// if address == providerEntry.Address {
// found = true
// break
// }
// }
// if !found {
// continue
// }
// }

// providerKey := LavaEntity{
// Address: providerEntry.Address,
// SpecId: specId,
// }

// apiInterfaces := chainIdToApiInterfaces[specId]
// // just to check if this is a provider we need to check we need one of the apiInterfaces
// if len(apiInterfaces) == 0 {
// utils.LavaFormatError("[!] invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId))
// // shouldn't happen
// continue
// }

// lookupKey := LavaEntity{
// Address: providerEntry.Address,
// SpecId: specId,
// ApiInterface: apiInterfaces[0],
// }

// mutex.Lock() // Lock before updating stakeEntries

// if _, ok := healthResults.getProviderData(lookupKey); ok || getAllProviders {
// if providerEntry.StakeAppliedBlock > uint64(currentBlock) {
// healthResults.FreezeProvider(providerKey)
// } else {
// stakeEntries[providerKey] = providerEntry
// }
// }

// mutex.Unlock()
// }
// break
// }
// if err != nil {
// utils.LavaFormatError("[!] Error after retries", err)
// select {
// case errCh <- err:
// utils.LavaFormatDebug("[+] Error sent to channel", utils.LogAttr("error", err))
// default:
// utils.LavaFormatDebug("[!] Error channel full, error not sent", utils.LogAttr("error", err))
// }
// }
// }

providerKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
}

apiInterfaces := chainIdToApiInterfaces[specId]
// just to check if this is a provider we need to check we need one of the apiInterfaces
if len(apiInterfaces) == 0 {
utils.LavaFormatError("[!] invalid state len(apiInterfaces) == 0", nil, utils.LogAttr("specId", specId))
// shouldn't happen
continue
}

lookupKey := LavaEntity{
Address: providerEntry.Address,
SpecId: specId,
ApiInterface: apiInterfaces[0],
}

mutex.Lock() // Lock before updating stakeEntries

if _, ok := healthResults.getProviderData(lookupKey); ok || getAllProviders {
if providerEntry.StakeAppliedBlock > uint64(currentBlock) {
healthResults.FreezeProvider(providerKey)
} else {
stakeEntries[providerKey] = providerEntry
}
}

mutex.Unlock()
}
break
}
if err != nil {
utils.LavaFormatError("[!] Error after retries", err)
select {
case errCh <- err:
utils.LavaFormatDebug("[+] Error sent to channel", utils.LogAttr("error", err))
default:
utils.LavaFormatDebug("[!] Error channel full, error not sent", utils.LogAttr("error", err))
}
}
}
// get provider stake entries for each spec or only for the ones given as arguments
if lookupSpecsFromArg != nil {
fmt.Println("lookupSpecsFromArg", lookupSpecsFromArg)
fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
for specId := range healthResults.getSpecs() {
fmt.Println("specId", specId)
utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed
for _, arg := range lookupSpecsFromArg {
fmt.Println("arg", arg)
if arg == strings.ToUpper(specId) {
fmt.Println("Match found for specId", specId) // Print when a match is found
utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found
processSpecProviders(specId)
break
}
}
}
} else {
fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
for specId := range healthResults.getSpecs() {
go processSpecProviders(specId)
}
}
// if lookupSpecsFromArg != nil {
// // fmt.Println("lookupSpecsFromArg", lookupSpecsFromArg)
// // fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
// for specId := range healthResults.getSpecs() {
// // fmt.Println("specId", specId)
// utils.LavaFormatDebug("[+] Processing specId", utils.LogAttr("specId", specId)) // Print the specId being processed
// for _, arg := range lookupSpecsFromArg {
// // fmt.Println("arg", arg)
// if arg == strings.ToUpper(specId) {
// // fmt.Println("Match found for specId", specId) // Print when a match is found
// utils.LavaFormatDebug("[+] Match found for specId", utils.LogAttr("specId", specId)) // Print when a match is found
// processSpecProviders(specId)
// break
// }
// }
// }
// } else {
// // fmt.Println("healthResults.getSpecs()", healthResults.getSpecs())
// for specId := range healthResults.getSpecs() {
// go processSpecProviders(specId)
// }
// }

if lookupSpecsFromArg == nil {
wgspecs.Wait()
Expand Down

0 comments on commit 4c89998

Please sign in to comment.