From 7d78e39fa259c5e454493b1f141d1e46786493f5 Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Sat, 10 Aug 2024 14:15:35 +0200 Subject: [PATCH 1/6] feat: health check --- config.md | 1 + internal/config/config.go | 3 ++ internal/proxy/proxy.go | 86 ++++++++++++++++++++++++++++++++++++++- internal/proxy/server.go | 41 +++++++++++++++---- 4 files changed, 123 insertions(+), 8 deletions(-) diff --git a/config.md b/config.md index d4df739..0342da2 100644 --- a/config.md +++ b/config.md @@ -11,6 +11,7 @@ - `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates. - `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID. - `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy. + - `AKASH_PROXY_HEALTH_INTERVAL` (default: `5m`) - Check Health on endpoints. - `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request. - `AKASH_PROXY_UNHEALTHY_SERVER_RECOVERY_CHANCE_PERCENT` (default: `1`) - How much chance (in %, 0-100), a node marked as unhealthy have to get a request again and recover. diff --git a/internal/config/config.go b/internal/config/config.go index 1b53e94..0cca06f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -35,6 +35,9 @@ type Config struct { // How slow on average a node needs to be to be marked as unhealthy. HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"` + // Check Health on endpoints. + CheckHealthInterval time.Duration `env:"HEALTH_INTERVAL" envDefault:"5m"` + // Request timeout for a proxied request. ProxyRequestTimeout time.Duration `env:"PROXY_REQUEST_TIMEOUT" envDefault:"15s"` diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 10c9691..359fbf2 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -2,6 +2,9 @@ package proxy import ( "context" + "encoding/json" + "fmt" + "io" "log/slog" "math/rand" "net/http" @@ -22,6 +25,49 @@ func New(cfg config.Config) *Proxy { } } +type StatusResponse struct { + Jsonrpc string `json:"jsonrpc"` + ID int `json:"id"` + Result struct { + NodeInfo struct { + ProtocolVersion struct { + P2P string `json:"p2p"` + Block string `json:"block"` + App string `json:"app"` + } `json:"protocol_version"` + ID string `json:"id"` + ListenAddr string `json:"listen_addr"` + Network string `json:"network"` + Version string `json:"version"` + Channels string `json:"channels"` + Moniker string `json:"moniker"` + Other struct { + TxIndex string `json:"tx_index"` + RPCAddress string `json:"rpc_address"` + } `json:"other"` + } `json:"node_info"` + SyncInfo struct { + LatestBlockHash string `json:"latest_block_hash"` + LatestAppHash string `json:"latest_app_hash"` + LatestBlockHeight string `json:"latest_block_height"` + LatestBlockTime time.Time `json:"latest_block_time"` + EarliestBlockHash string `json:"earliest_block_hash"` + EarliestAppHash string `json:"earliest_app_hash"` + EarliestBlockHeight string `json:"earliest_block_height"` + EarliestBlockTime time.Time `json:"earliest_block_time"` + CatchingUp bool `json:"catching_up"` + } `json:"sync_info"` + ValidatorInfo struct { + Address string `json:"address"` + PubKey struct { + Type string `json:"type"` + Value string `json:"value"` + } `json:"pub_key"` + VotingPower string `json:"voting_power"` + } `json:"validator_info"` + } `json:"result"` +} + type Proxy struct { cfg config.Config init sync.Once @@ -71,6 +117,43 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) } +func checkSingleRPC(url string) error { + req, err := http.NewRequest("GET", url+"/status", nil) + if err != nil { + return fmt.Errorf("error creating request: %v", err) + } + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("error reading response body: %v", err) + } + + var status StatusResponse + if err := json.Unmarshal(body, &status); err != nil { + return fmt.Errorf("error unmarshaling JSON: %v", err) + } + + if status.Result.SyncInfo.CatchingUp { + return fmt.Errorf("node is still catching up") + } + + if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().Add(-time.Minute)) { + return fmt.Errorf("latest block time is more than 1 minute old") + } + + return nil +} + func (p *Proxy) next() *Server { p.mu.Lock() if len(p.servers) == 0 { @@ -104,6 +187,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error { rpc.Address, p.cfg.HealthyThreshold, p.cfg.ProxyRequestTimeout, + p.cfg.CheckHealthInterval, ) if err != nil { return err @@ -115,7 +199,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error { // remove deleted servers p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool { for _, rpc := range rpcs { - if rpc.Provider == srv.name { + if rpc.Provider == srv.name && srv.healthy.Load() { return false } } diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 293135d..e1d84b4 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -13,32 +13,59 @@ import ( "github.com/akash-network/rpc-proxy/internal/avg" ) -func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration) (*Server, error) { +func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration, healthInterval time.Duration) (*Server, error) { target, err := url.Parse(addr) if err != nil { return nil, fmt.Errorf("could not create new server: %w", err) } - return &Server{ + + server := &Server{ name: name, url: target, pings: avg.Moving(50), healthyThreshold: healthyThreshold, requestTimeout: requestTimeout, - }, nil + lastHealthCheck: time.Now().UTC(), + healthInterval: healthInterval, + healthy: atomic.Bool{}, + } + + err = checkSingleRPC(addr) + server.healthy.Store(err == nil) + + return server, nil } type Server struct { - name string - url *url.URL - pings *avg.MovingAverage + name string + url *url.URL + pings *avg.MovingAverage + lastHealthCheck time.Time + healthy atomic.Bool requestCount atomic.Int64 + healthInterval time.Duration healthyThreshold time.Duration requestTimeout time.Duration } func (s *Server) Healthy() bool { - return s.pings.Last() < s.healthyThreshold + now := time.Now().UTC() + if now.Sub(s.lastHealthCheck) >= s.healthInterval { + slog.Info("checking health", "name", s.name) + err := checkSingleRPC(s.url.String()) + healthy := err == nil + s.healthy.Store(healthy) + s.lastHealthCheck = now + + if healthy { + slog.Info("server is healthy", "name", s.name) + } else { + slog.Error("server is unhealthy", "name", s.name, "err", err) + } + } + + return s.pings.Last() < s.healthyThreshold && s.healthy.Load() } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { From a26074791f7a37fa89801873cd158f573babaa62 Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Mon, 19 Aug 2024 12:59:12 +0200 Subject: [PATCH 2/6] chore: reduce struct --- internal/proxy/proxy.go | 38 +++++--------------------------------- 1 file changed, 5 insertions(+), 33 deletions(-) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 359fbf2..ec67e4e 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -27,44 +27,16 @@ func New(cfg config.Config) *Proxy { type StatusResponse struct { Jsonrpc string `json:"jsonrpc"` - ID int `json:"id"` Result struct { NodeInfo struct { - ProtocolVersion struct { - P2P string `json:"p2p"` - Block string `json:"block"` - App string `json:"app"` - } `json:"protocol_version"` - ID string `json:"id"` - ListenAddr string `json:"listen_addr"` - Network string `json:"network"` - Version string `json:"version"` - Channels string `json:"channels"` - Moniker string `json:"moniker"` - Other struct { - TxIndex string `json:"tx_index"` - RPCAddress string `json:"rpc_address"` - } `json:"other"` + ID string `json:"id"` + Network string `json:"network"` + Version string `json:"version"` } `json:"node_info"` SyncInfo struct { - LatestBlockHash string `json:"latest_block_hash"` - LatestAppHash string `json:"latest_app_hash"` - LatestBlockHeight string `json:"latest_block_height"` - LatestBlockTime time.Time `json:"latest_block_time"` - EarliestBlockHash string `json:"earliest_block_hash"` - EarliestAppHash string `json:"earliest_app_hash"` - EarliestBlockHeight string `json:"earliest_block_height"` - EarliestBlockTime time.Time `json:"earliest_block_time"` - CatchingUp bool `json:"catching_up"` + LatestBlockTime time.Time `json:"latest_block_time"` + CatchingUp bool `json:"catching_up"` } `json:"sync_info"` - ValidatorInfo struct { - Address string `json:"address"` - PubKey struct { - Type string `json:"type"` - Value string `json:"value"` - } `json:"pub_key"` - VotingPower string `json:"voting_power"` - } `json:"validator_info"` } `json:"result"` } From c8facbdc014382b747edb52bfd28aeccaf3de983 Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:26:10 +0200 Subject: [PATCH 3/6] feat: rest endpoint check --- internal/proxy/proxy.go | 143 +++++++++++++++++++++++++-------------- internal/proxy/server.go | 15 ++-- 2 files changed, 102 insertions(+), 56 deletions(-) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index dd7c8c9..3ce5ef3 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/akash-network/rpc-proxy/internal/config" "github.com/akash-network/rpc-proxy/internal/seed" @@ -38,20 +39,29 @@ func New( } type StatusResponse struct { - Jsonrpc string `json:"jsonrpc"` - Result struct { - NodeInfo struct { - ID string `json:"id"` - Network string `json:"network"` - Version string `json:"version"` - } `json:"node_info"` - SyncInfo struct { - LatestBlockTime time.Time `json:"latest_block_time"` - CatchingUp bool `json:"catching_up"` - } `json:"sync_info"` - } `json:"result"` + Jsonrpc string `json:"jsonrpc"` + Result struct { + NodeInfo struct { + ID string `json:"id"` + Network string `json:"network"` + Version string `json:"version"` + } `json:"node_info"` + SyncInfo struct { + LatestBlockTime time.Time `json:"latest_block_time"` + CatchingUp bool `json:"catching_up"` + } `json:"sync_info"` + } `json:"result"` } +type RestStatusResponse struct { + Block struct { + Header struct { + ChainID string `json:"chain_id"` + Height string `json:"height"` + Time time.Time `json:"time"` + } `json:"header"` + } `json:"block"` +} type Proxy struct { cfg config.Config kind ProxyKind @@ -109,41 +119,75 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) } -func checkSingleRPC(url string) error { - req, err := http.NewRequest("GET", url+"/status", nil) - if err != nil { - return fmt.Errorf("error creating request: %v", err) - } - client := &http.Client{Timeout: 2 * time.Second} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("error making request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body: %v", err) - } - - var status StatusResponse - if err := json.Unmarshal(body, &status); err != nil { - return fmt.Errorf("error unmarshaling JSON: %v", err) - } - - if status.Result.SyncInfo.CatchingUp { - return fmt.Errorf("node is still catching up") - } - - if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().Add(-time.Minute)) { - return fmt.Errorf("latest block time is more than 1 minute old") - } - - return nil +func checkEndpoint(url string, kind ProxyKind) error { + switch kind { + case RPC: + return checkRPC(url) + case Rest: + return checkREST(url) + default: + return fmt.Errorf("unsupported proxy kind: %v", kind) + } +} + +func performGetRequest(url string, timeout time.Duration) ([]byte, error) { + client := &http.Client{Timeout: timeout} + resp, err := client.Get(url) + if err != nil { + return nil, fmt.Errorf("error making request: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %v", err) + } + + return body, nil +} + +func checkRPC(url string) error { + body, err := performGetRequest(url+"/status", 2*time.Second) + if err != nil { + return err + } + + var status StatusResponse + if err := json.Unmarshal(body, &status); err != nil { + return fmt.Errorf("error unmarshaling JSON: %v", err) + } + + if status.Result.SyncInfo.CatchingUp { + return fmt.Errorf("node is still catching up") + } + + if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().Add(-time.Minute)) { + return fmt.Errorf("latest block time is more than 1 minute old") + } + + return nil +} + +func checkREST(url string) error { + body, err := performGetRequest(url+"/blocks/latest", 2*time.Second) + if err != nil { + return err + } + + var status RestStatusResponse + if err := json.Unmarshal(body, &status); err != nil { + return fmt.Errorf("error unmarshaling JSON: %v", err) + } + + if !status.Block.Header.Time.After(time.Now().Add(-time.Minute)) { + return fmt.Errorf("latest block time is more than 1 minute old") + } + + return nil } func (p *Proxy) next() *Server { @@ -190,9 +234,8 @@ func (p *Proxy) doUpdate(providers []seed.Provider) error { srv, err := newServer( provider.Provider, provider.Address, - p.cfg.HealthyThreshold, - p.cfg.ProxyRequestTimeout, - p.cfg.CheckHealthInterval, + p.cfg, + p.kind, ) if err != nil { return err diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 03243ba..6140a1c 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -15,7 +15,7 @@ import ( "github.com/akash-network/rpc-proxy/internal/ttlslice" ) -func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration, healthInterval time.Duration, cfg config.Config) (*Server, error) { +func newServer(name, addr string, cfg config.Config, kind ProxyKind) (*Server, error) { target, err := url.Parse(addr) if err != nil { return nil, fmt.Errorf("could not create new server: %w", err) @@ -28,14 +28,15 @@ func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration cfg: cfg, successes: ttlslice.New[int](), failures: ttlslice.New[int](), - healthyThreshold: healthyThreshold, - requestTimeout: requestTimeout, + healthyThreshold: cfg.HealthyThreshold, + requestTimeout: cfg.ProxyRequestTimeout, lastHealthCheck: time.Now().UTC(), - healthInterval: healthInterval, + healthInterval: cfg.CheckHealthInterval, healthy: atomic.Bool{}, + kind: kind, } - err = checkSingleRPC(addr) + err = checkEndpoint(addr, kind) server.healthy.Store(err == nil) return server, nil @@ -44,6 +45,7 @@ func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration type Server struct { name string url *url.URL + kind ProxyKind pings *avg.MovingAverage lastHealthCheck time.Time healthy atomic.Bool @@ -59,9 +61,10 @@ type Server struct { func (s *Server) Healthy() bool { now := time.Now().UTC() + //Add different config value if wanted if now.Sub(s.lastHealthCheck) >= s.healthInterval { slog.Info("checking health", "name", s.name) - err := checkSingleRPC(s.url.String()) + err := checkEndpoint(s.url.String(), s.kind) healthy := err == nil s.healthy.Store(healthy) s.lastHealthCheck = now From 6d8538d2102b2e71da48f8f6a1064249255d52ca Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:32:19 +0200 Subject: [PATCH 4/6] chore: use utc for server time comparison --- internal/proxy/proxy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 3ce5ef3..9226cb9 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -165,7 +165,7 @@ func checkRPC(url string) error { return fmt.Errorf("node is still catching up") } - if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().Add(-time.Minute)) { + if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().UTC().Add(-time.Minute)) { return fmt.Errorf("latest block time is more than 1 minute old") } @@ -183,7 +183,7 @@ func checkREST(url string) error { return fmt.Errorf("error unmarshaling JSON: %v", err) } - if !status.Block.Header.Time.After(time.Now().Add(-time.Minute)) { + if !status.Block.Header.Time.After(time.Now().UTC().Add(-time.Minute)) { return fmt.Errorf("latest block time is more than 1 minute old") } From a33fbfa2b197247a5750b77634a5edbb22c91154 Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:45:05 +0200 Subject: [PATCH 5/6] chore: use existing --- internal/proxy/server.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 6140a1c..05623c5 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -28,10 +28,7 @@ func newServer(name, addr string, cfg config.Config, kind ProxyKind) (*Server, e cfg: cfg, successes: ttlslice.New[int](), failures: ttlslice.New[int](), - healthyThreshold: cfg.HealthyThreshold, - requestTimeout: cfg.ProxyRequestTimeout, lastHealthCheck: time.Now().UTC(), - healthInterval: cfg.CheckHealthInterval, healthy: atomic.Bool{}, kind: kind, } @@ -51,9 +48,6 @@ type Server struct { healthy atomic.Bool requestCount atomic.Int64 - healthInterval time.Duration - healthyThreshold time.Duration - requestTimeout time.Duration cfg config.Config successes *ttlslice.Slice[int] failures *ttlslice.Slice[int] @@ -62,7 +56,7 @@ type Server struct { func (s *Server) Healthy() bool { now := time.Now().UTC() //Add different config value if wanted - if now.Sub(s.lastHealthCheck) >= s.healthInterval { + if now.Sub(s.lastHealthCheck) >= s.cfg.CheckHealthInterval { slog.Info("checking health", "name", s.name) err := checkEndpoint(s.url.String(), s.kind) healthy := err == nil @@ -76,7 +70,7 @@ func (s *Server) Healthy() bool { } } - return s.pings.Last() < s.healthyThreshold && s.healthy.Load() + return s.pings.Last() < s.cfg.HealthyThreshold && s.healthy.Load() } func (s *Server) ErrorRate() float64 { suss := len(s.successes.List()) From ded6e23f4050f2d0da465fb453ae110b97a975b9 Mon Sep 17 00:00:00 2001 From: nick134 <76399455+nick134-bit@users.noreply.github.com> Date: Thu, 5 Sep 2024 22:09:13 +0200 Subject: [PATCH 6/6] chore: implement feedback --- internal/proxy/proxy.go | 46 +++++++++++++++++------------- internal/proxy/proxy_test.go | 54 +++++++++++++++++++++++++++++++----- internal/proxy/server.go | 30 ++++++++++---------- 3 files changed, 88 insertions(+), 42 deletions(-) diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 9226cb9..ede35f2 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -38,30 +38,36 @@ func New( } } -type StatusResponse struct { +type RPCSyncInfo struct { + LatestBlockTime time.Time `json:"latest_block_time"` + CatchingUp bool `json:"catching_up"` +} +type RPCNodeInfo struct { + ID string `json:"id"` + Network string `json:"network"` + Version string `json:"version"` +} + +type RPCStatusResponse struct { Jsonrpc string `json:"jsonrpc"` Result struct { - NodeInfo struct { - ID string `json:"id"` - Network string `json:"network"` - Version string `json:"version"` - } `json:"node_info"` - SyncInfo struct { - LatestBlockTime time.Time `json:"latest_block_time"` - CatchingUp bool `json:"catching_up"` - } `json:"sync_info"` + NodeInfo RPCNodeInfo `json:"node_info"` + SyncInfo RPCSyncInfo `json:"sync_info"` } `json:"result"` } +type RestBlockHeader struct { + ChainID string `json:"chain_id"` + Height string `json:"height"` + Time time.Time `json:"time"` +} + type RestStatusResponse struct { Block struct { - Header struct { - ChainID string `json:"chain_id"` - Height string `json:"height"` - Time time.Time `json:"time"` - } `json:"header"` + Header RestBlockHeader `json:"header"` } `json:"block"` } + type Proxy struct { cfg config.Config kind ProxyKind @@ -87,7 +93,7 @@ func (p *Proxy) Stats() []ServerStat { Name: s.name, URL: s.url.String(), Avg: s.pings.Last(), - Degraded: !s.Healthy(), + Degraded: !s.IsHealthy(), Initialized: reqCount > 0, Requests: reqCount, ErrorRate: s.ErrorRate(), @@ -156,9 +162,9 @@ func checkRPC(url string) error { return err } - var status StatusResponse + var status RPCStatusResponse if err := json.Unmarshal(body, &status); err != nil { - return fmt.Errorf("error unmarshaling JSON: %v", err) + return fmt.Errorf("error unmarshaling JSON in RPC check: %v (response body: %s)", err, string(body)) } if status.Result.SyncInfo.CatchingUp { @@ -180,7 +186,7 @@ func checkREST(url string) error { var status RestStatusResponse if err := json.Unmarshal(body, &status); err != nil { - return fmt.Errorf("error unmarshaling JSON: %v", err) + return fmt.Errorf("error unmarshaling JSON in REST check: %v (response body: %s)", err, string(body)) } if !status.Block.Header.Time.After(time.Now().UTC().Add(-time.Minute)) { @@ -199,7 +205,7 @@ func (p *Proxy) next() *Server { server := p.servers[p.round%len(p.servers)] p.round++ p.mu.Unlock() - if server.Healthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold { + if server.IsHealthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold { return server } if rand.Intn(99)+1 < p.cfg.UnhealthyServerRecoverChancePct { diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index b9e0067..441ca8f 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "encoding/json" "fmt" "io" "net/http" @@ -28,14 +29,50 @@ func TestProxy(t *testing.T) { func testProxy(tb testing.TB, kind ProxyKind) { srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, _ = io.WriteString(w, "srv1 replied") + if r.URL.Path == "/status" || r.URL.Path == "/blocks/latest" { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "result": map[string]interface{}{ + "sync_info": map[string]interface{}{ + "latest_block_time": time.Now().Format(time.RFC3339), + "catching_up": false, + }, + }, + "block": map[string]interface{}{ + "header": map[string]interface{}{ + "time": time.Now().Format(time.RFC3339), + }, + }, + }) + } else { + _, _ = io.WriteString(w, "srv1 replied") + } })) tb.Cleanup(srv1.Close) + srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - time.Sleep(time.Millisecond * 500) - _, _ = io.WriteString(w, "srv2 replied") + if r.URL.Path == "/status" || r.URL.Path == "/blocks/latest" { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "result": map[string]interface{}{ + "sync_info": map[string]interface{}{ + "latest_block_time": time.Now().Format(time.RFC3339), + "catching_up": false, + }, + }, + "block": map[string]interface{}{ + "header": map[string]interface{}{ + "time": time.Now().Format(time.RFC3339), + }, + }, + }) + } else { + time.Sleep(time.Millisecond * 500) + _, _ = io.WriteString(w, "srv2 replied") + } })) tb.Cleanup(srv2.Close) + srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusTeapot) })) @@ -48,6 +85,7 @@ func testProxy(tb testing.TB, kind ProxyKind) { UnhealthyServerRecoverChancePct: 1, HealthyErrorRateThreshold: 10, HealthyErrorRateBucketTimeout: time.Second * 10, + CheckHealthInterval: time.Second * 5, }) ctx, cancel := context.WithCancel(context.Background()) @@ -77,8 +115,7 @@ func testProxy(tb testing.TB, kind ProxyKind) { } require.Eventually(tb, func() bool { return proxy.initialized.Load() }, time.Second, time.Millisecond) - - require.Len(tb, proxy.servers, 3) + require.Len(tb, proxy.servers, 2) proxySrv := httptest.NewServer(proxy) tb.Cleanup(proxySrv.Close) @@ -111,7 +148,7 @@ func testProxy(tb testing.TB, kind ProxyKind) { cancel() stats := proxy.Stats() - require.Len(tb, stats, 3) + require.Len(tb, stats, 2) var srv1Stats ServerStat var srv2Stats ServerStat @@ -129,11 +166,14 @@ func testProxy(tb testing.TB, kind ProxyKind) { } require.Zero(tb, srv1Stats.ErrorRate) require.Zero(tb, srv2Stats.ErrorRate) - require.Equal(tb, float64(100), srv3Stats.ErrorRate) require.Greater(tb, srv1Stats.Requests, srv2Stats.Requests) require.Greater(tb, srv2Stats.Avg, srv1Stats.Avg) require.False(tb, srv1Stats.Degraded) require.True(tb, srv2Stats.Degraded) require.True(tb, srv1Stats.Initialized) require.True(tb, srv2Stats.Initialized) + require.False(tb, srv3Stats.Initialized) + + require.Len(tb, proxy.servers, 2) + require.Equal(tb, int64(0), srv3Stats.Requests) } diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 05623c5..ce469a9 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -22,15 +22,15 @@ func newServer(name, addr string, cfg config.Config, kind ProxyKind) (*Server, e } server := &Server{ - name: name, - url: target, - pings: avg.Moving(50), - cfg: cfg, - successes: ttlslice.New[int](), - failures: ttlslice.New[int](), - lastHealthCheck: time.Now().UTC(), - healthy: atomic.Bool{}, - kind: kind, + name: name, + url: target, + pings: avg.Moving(50), + cfg: cfg, + successes: ttlslice.New[int](), + failures: ttlslice.New[int](), + lastHealthCheck: time.Now().UTC(), + healthy: atomic.Bool{}, + kind: kind, } err = checkEndpoint(addr, kind) @@ -47,13 +47,13 @@ type Server struct { lastHealthCheck time.Time healthy atomic.Bool - requestCount atomic.Int64 - cfg config.Config - successes *ttlslice.Slice[int] - failures *ttlslice.Slice[int] + requestCount atomic.Int64 + cfg config.Config + successes *ttlslice.Slice[int] + failures *ttlslice.Slice[int] } -func (s *Server) Healthy() bool { +func (s *Server) IsHealthy() bool { now := time.Now().UTC() //Add different config value if wanted if now.Sub(s.lastHealthCheck) >= s.cfg.CheckHealthInterval { @@ -134,7 +134,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.failures.Append(status, s.cfg.HealthyErrorRateBucketTimeout) } - if !s.Healthy() && ctx.Err() == nil && err == nil { + if !s.IsHealthy() && ctx.Err() == nil && err == nil { // if it's not healthy, this is a tryout to improve - if the request // wasn't canceled, reset stats slog.Info("resetting statistics", "name", s.name)