Skip to content

Commit

Permalink
Merge pull request #13 from vitwit/anil/fix_concurrency
Browse files Browse the repository at this point in the history
fix concurency
  • Loading branch information
anilcse authored Dec 12, 2024
2 parents 07ecbaa + 7b79367 commit fdde90e
Showing 1 changed file with 71 additions and 54 deletions.
125 changes: 71 additions & 54 deletions internal/cosmos/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/anilcse/cosmoscope/internal/portfolio"
Expand All @@ -21,61 +22,15 @@ var (
chainInfoCache = make(map[string]*ChainInfo)
assetListCache = make(map[string]AssetList)
registryBaseURL = "https://raw.githubusercontent.com/cosmos/chain-registry/master"
cacheMutex sync.RWMutex
)

// getActiveEndpoint tries each REST endpoint until it finds one that responds
func getActiveEndpoint(endpoints []RestEndpoint) string {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

type result struct {
endpoint string
err error
}
resultChan := make(chan result)

// Try all endpoints concurrently
for _, endpoint := range endpoints {
go func(addr string) {
client := &http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequestWithContext(ctx, "GET", addr+"/cosmos/base/tendermint/v1beta1/node_info", nil)
if err != nil {
resultChan <- result{endpoint: addr, err: err}
return
}

resp, err := client.Do(req)
if err != nil {
resultChan <- result{endpoint: addr, err: err}
return
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
resultChan <- result{endpoint: addr, err: nil}
} else {
resultChan <- result{endpoint: addr, err: fmt.Errorf("endpoint returned status %d", resp.StatusCode)}
}
}(endpoint.Address)
}

// Return the first successful endpoint
for range endpoints {
select {
case r := <-resultChan:
if r.err == nil {
return r.endpoint
}
case <-ctx.Done():
return ""
}
}

return ""
}

func FetchChainInfo(network string) (*ChainInfo, error) {
if info, exists := chainInfoCache[network]; exists {
// Try to read from cache first
cacheMutex.RLock()
info, exists := chainInfoCache[network]
cacheMutex.RUnlock()
if exists {
return info, nil
}

Expand All @@ -93,12 +48,20 @@ func FetchChainInfo(network string) (*ChainInfo, error) {
return nil, fmt.Errorf("error decoding chain info: %v", err)
}

// Store in cache with write lock
cacheMutex.Lock()
chainInfoCache[network] = &chainInfo
cacheMutex.Unlock()

return &chainInfo, nil
}

func fetchAssetList(network string) (*AssetList, error) {
if assetList, exists := assetListCache[network]; exists {
// Try to read from cache first
cacheMutex.RLock()
assetList, exists := assetListCache[network]
cacheMutex.RUnlock()
if exists {
return &assetList, nil
}

Expand All @@ -111,12 +74,15 @@ func fetchAssetList(network string) (*AssetList, error) {
}
defer resp.Body.Close()

var assetList AssetList
if err := json.NewDecoder(resp.Body).Decode(&assetList); err != nil {
return nil, fmt.Errorf("error decoding asset list: %v", err)
}

// Store in cache with write lock
cacheMutex.Lock()
assetListCache[network] = assetList
cacheMutex.Unlock()

return &assetList, nil
}

Expand Down Expand Up @@ -331,3 +297,54 @@ func getHexAddress(address string) string {
}
return hex.EncodeToString(bz)
}

// getActiveEndpoint tries each REST endpoint until it finds one that responds
func getActiveEndpoint(endpoints []RestEndpoint) string {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

type result struct {
endpoint string
err error
}
resultChan := make(chan result)

// Try all endpoints concurrently
for _, endpoint := range endpoints {
go func(addr string) {
client := &http.Client{Timeout: 2 * time.Second}
req, err := http.NewRequestWithContext(ctx, "GET", addr+"/cosmos/base/tendermint/v1beta1/node_info", nil)
if err != nil {
resultChan <- result{endpoint: addr, err: err}
return
}

resp, err := client.Do(req)
if err != nil {
resultChan <- result{endpoint: addr, err: err}
return
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
resultChan <- result{endpoint: addr, err: nil}
} else {
resultChan <- result{endpoint: addr, err: fmt.Errorf("endpoint returned status %d", resp.StatusCode)}
}
}(endpoint.Address)
}

// Return the first successful endpoint
for range endpoints {
select {
case r := <-resultChan:
if r.err == nil {
return r.endpoint
}
case <-ctx.Done():
return ""
}
}

return ""
}

0 comments on commit fdde90e

Please sign in to comment.