diff --git a/config.md b/config.md index d4df739..9b3f3d0 100644 --- a/config.md +++ b/config.md @@ -11,6 +11,8 @@ - `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates. - `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID. - `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy. + - `AKASH_PROXY_HEALTHY_ERROR_RATE_THRESHOLD` (default: `30`) - Percentage of request errors deemed acceptable. + - `AKASH_PROXY_HEALTHY_ERROR_RATE_BUCKET_TIMEOUT` (default: `1m`) - How long in the past requests are considered to check for status codes. - `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request. - `AKASH_PROXY_UNHEALTHY_SERVER_RECOVERY_CHANCE_PERCENT` (default: `1`) - How much chance (in %, 0-100), a node marked as unhealthy have to get a request again and recover. diff --git a/index.html b/index.html index cc10bb4..fa68dce 100644 --- a/index.html +++ b/index.html @@ -14,6 +14,7 @@

Akash Proxy

URL Average response time Request Count + Error Rate Status @@ -24,6 +25,7 @@

Akash Proxy

{{.URL}} {{.Avg}} {{.Requests}} + {{.ErrorRate}}% {{ if not .Initialized}} diff --git a/internal/config/config.go b/internal/config/config.go index 1b53e94..a2c4e2c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,7 +6,7 @@ import ( "github.com/caarlos0/env/v11" ) -//go:generate go run github.com/g4s8/envdoc@latest -output ../../config.md -env-prefix AKASH_PROXY_ -type Config +//go:generate go run github.com/g4s8/envdoc@latest -output ../../config.md -env-prefix AKASH_PROXY_ -types Config type Config struct { // Address to listen to. Listen string `env:"LISTEN" envDefault:":https"` @@ -35,6 +35,12 @@ type Config struct { // How slow on average a node needs to be to be marked as unhealthy. HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"` + // Percentage of request errors deemed acceptable. + HealthyErrorRateThreshold float64 `env:"HEALTHY_ERROR_RATE_THRESHOLD" envDefault:"30"` + + // How long in the past requests are considered to check for status codes. + HealthyErrorRateBucketTimeout time.Duration `env:"HEALTHY_ERROR_RATE_BUCKET_TIMEOUT" envDefault:"1m"` + // Request timeout for a proxied request. ProxyRequestTimeout time.Duration `env:"PROXY_REQUEST_TIMEOUT" envDefault:"15s"` diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 10c9691..4199f19 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -7,7 +7,6 @@ import ( "net/http" "slices" "sort" - "strings" "sync" "sync/atomic" "time" @@ -48,6 +47,7 @@ func (p *Proxy) Stats() []ServerStat { Degraded: !s.Healthy(), Initialized: reqCount > 0, Requests: reqCount, + ErrorRate: s.ErrorRate(), }) } sort.Sort(serverStats(result)) @@ -61,7 +61,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rpc") if srv := p.next(); srv != nil { srv.ServeHTTP(w, r) return @@ -80,7 +79,7 @@ func (p *Proxy) next() *Server { server := p.servers[p.round%len(p.servers)] p.round++ p.mu.Unlock() - if server.Healthy() { + if server.Healthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold { return server } if rand.Intn(99)+1 < p.cfg.UnhealthyServerRecoverChancePct { @@ -102,8 +101,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error { srv, err := newServer( rpc.Provider, rpc.Address, - p.cfg.HealthyThreshold, - p.cfg.ProxyRequestTimeout, + p.cfg, ) if err != nil { return err diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 6842bcb..7ad2d45 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -19,12 +19,16 @@ import ( func TestProxy(t *testing.T) { const chainID = "unittest" srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - io.WriteString(w, "srv1 replied") + _, _ = io.WriteString(w, "srv1 replied") })) t.Cleanup(srv1.Close) srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { time.Sleep(time.Millisecond * 500) - io.WriteString(w, "srv2 replied") + _, _ = io.WriteString(w, "srv2 replied") + })) + t.Cleanup(srv2.Close) + srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusTeapot) })) t.Cleanup(srv2.Close) @@ -40,6 +44,10 @@ func TestProxy(t *testing.T) { Address: srv2.URL, Provider: "srv2", }, + { + Address: srv3.URL, + Provider: "srv3", + }, }, }, } @@ -47,7 +55,6 @@ func TestProxy(t *testing.T) { t.Logf("%+v", seed) seedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t.Log("AQUI") bts, _ := json.Marshal(seed) _, _ = w.Write(bts) })) @@ -60,13 +67,15 @@ func TestProxy(t *testing.T) { HealthyThreshold: 10 * time.Millisecond, ProxyRequestTimeout: time.Second, UnhealthyServerRecoverChancePct: 1, + HealthyErrorRateThreshold: 10, + HealthyErrorRateBucketTimeout: time.Second * 10, }) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) proxy.Start(ctx) - require.Len(t, proxy.servers, 2) + require.Len(t, proxy.servers, 3) proxySrv := httptest.NewServer(proxy) t.Cleanup(proxySrv.Close) @@ -85,7 +94,8 @@ func TestProxy(t *testing.T) { return err } defer resp.Body.Close() - if resp.StatusCode != 200 { + // only two status codes accepted + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusTeapot { bts, _ := io.ReadAll(resp.Body) return fmt.Errorf("bad status code: %v: %s", resp.StatusCode, string(bts)) } @@ -98,10 +108,11 @@ func TestProxy(t *testing.T) { cancel() stats := proxy.Stats() - require.Len(t, stats, 2) + require.Len(t, stats, 3) var srv1Stats ServerStat var srv2Stats ServerStat + var srv3Stats ServerStat for _, st := range stats { if st.Name == "srv1" { srv1Stats = st @@ -109,7 +120,13 @@ func TestProxy(t *testing.T) { if st.Name == "srv2" { srv2Stats = st } + if st.Name == "srv3" { + srv3Stats = st + } } + require.Zero(t, srv1Stats.ErrorRate) + require.Zero(t, srv2Stats.ErrorRate) + require.Equal(t, float64(100), srv3Stats.ErrorRate) require.Greater(t, srv1Stats.Requests, srv2Stats.Requests) require.Greater(t, srv2Stats.Avg, srv1Stats.Avg) require.False(t, srv1Stats.Degraded) diff --git a/internal/proxy/server.go b/internal/proxy/server.go index 293135d..2c2f21a 100644 --- a/internal/proxy/server.go +++ b/internal/proxy/server.go @@ -7,51 +7,71 @@ import ( "log/slog" "net/http" "net/url" + "strings" "sync/atomic" "time" "github.com/akash-network/rpc-proxy/internal/avg" + "github.com/akash-network/rpc-proxy/internal/config" + "github.com/akash-network/rpc-proxy/internal/ttlslice" ) -func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration) (*Server, error) { +func newServer(name, addr string, cfg config.Config) (*Server, error) { target, err := url.Parse(addr) if err != nil { return nil, fmt.Errorf("could not create new server: %w", err) } return &Server{ - name: name, - url: target, - pings: avg.Moving(50), - healthyThreshold: healthyThreshold, - requestTimeout: requestTimeout, + name: name, + url: target, + pings: avg.Moving(50), + cfg: cfg, + successes: ttlslice.New[int](), + failures: ttlslice.New[int](), }, nil } type Server struct { - name string - url *url.URL - pings *avg.MovingAverage + cfg config.Config + name string + url *url.URL + pings *avg.MovingAverage + successes *ttlslice.Slice[int] + failures *ttlslice.Slice[int] + requestCount atomic.Int64 +} - requestCount atomic.Int64 - healthyThreshold time.Duration - requestTimeout time.Duration +func (s *Server) ErrorRate() float64 { + suss := len(s.successes.List()) + fail := len(s.failures.List()) + total := suss + fail + if total == 0 { + return 0 + } + return (float64(fail) * 100) / float64(total) } func (s *Server) Healthy() bool { - return s.pings.Last() < s.healthyThreshold + return s.pings.Last() < s.cfg.HealthyThreshold } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var status int start := time.Now() defer func() { d := time.Since(start) avg := s.pings.Next(d) - slog.Info("request done", "name", s.name, "avg", avg, "last", d) + slog.Info("request done", "name", s.name, "avg", avg, "last", d, "status", status) }() proxiedURL := r.URL proxiedURL.Host = s.url.Host proxiedURL.Scheme = s.url.Scheme + + if !strings.HasSuffix(s.url.Path, "/rpc") { + proxiedURL.Path = strings.TrimSuffix(proxiedURL.Path, "/rpc") + } + slog.Info("proxying request", "name", s.name, "url", proxiedURL) rr := &http.Request{ @@ -63,10 +83,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { Close: r.Close, } - ctx, cancel := context.WithTimeout(r.Context(), s.requestTimeout) + ctx, cancel := context.WithTimeout(r.Context(), s.cfg.ProxyRequestTimeout) defer cancel() resp, err := http.DefaultClient.Do(rr.WithContext(ctx)) + status = resp.StatusCode if err == nil { defer resp.Body.Close() for k, v := range resp.Header { @@ -81,6 +102,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { } s.requestCount.Add(1) + if resp.StatusCode >= 200 && resp.StatusCode <= 300 { + s.successes.Append(resp.StatusCode, s.cfg.HealthyErrorRateBucketTimeout) + } else { + s.failures.Append(resp.StatusCode, s.cfg.HealthyErrorRateBucketTimeout) + } + if !s.Healthy() && ctx.Err() == nil && err == nil { // if it's not healthy, this is a tryout to improve - if the request // wasn't canceled, reset stats diff --git a/internal/proxy/stats.go b/internal/proxy/stats.go index 7e2957e..b72abe7 100644 --- a/internal/proxy/stats.go +++ b/internal/proxy/stats.go @@ -9,6 +9,7 @@ type ServerStat struct { Degraded bool Initialized bool Requests int64 + ErrorRate float64 } type serverStats []ServerStat @@ -30,5 +31,11 @@ func (st serverStats) Less(i, j int) bool { if sj.Degraded && !si.Degraded { return true } + if si.ErrorRate > sj.ErrorRate { + return false + } + if sj.ErrorRate > si.ErrorRate { + return true + } return si.Avg < sj.Avg } diff --git a/internal/proxy/stats_test.go b/internal/proxy/stats_test.go index 253c424..d5e5e17 100644 --- a/internal/proxy/stats_test.go +++ b/internal/proxy/stats_test.go @@ -22,6 +22,7 @@ func TestCompareServerStats(t *testing.T) { Avg: time.Second, Degraded: false, Initialized: true, + ErrorRate: 10, }, { Name: "2", @@ -46,9 +47,16 @@ func TestCompareServerStats(t *testing.T) { Avg: 0, Degraded: true, Initialized: true, + ErrorRate: 15, }, } t.Log(names(v)) sort.Sort(serverStats(v)) - require.Equal(t, []string{"4", "1", "5", "2", "3"}, names(v)) + require.Equal(t, []string{ + "4", + "1", + "2", + "5", + "3", + }, names(v)) } diff --git a/internal/ttlslice/slice.go b/internal/ttlslice/slice.go new file mode 100644 index 0000000..b3a67f7 --- /dev/null +++ b/internal/ttlslice/slice.go @@ -0,0 +1,65 @@ +package ttlslice + +import ( + "sync" + "time" +) + +type item[T any] struct { + value T + expiry time.Time +} + +func (i item[V]) isExpired() bool { + return time.Now().After(i.expiry) +} + +func New[T any]() *Slice[T] { + m := &Slice[T]{ + items: []item[T]{}, + } + + go func() { + for range time.Tick(time.Second) { + var newItems []item[T] + m.mu.Lock() + for _, v := range m.items { + if v.isExpired() { + continue + } + newItems = append(newItems, v) + } + m.items = newItems + m.mu.Unlock() + } + }() + + return m +} + +type Slice[T any] struct { + items []item[T] + mu sync.Mutex +} + +func (m *Slice[T]) Append(t T, ttl time.Duration) { + m.mu.Lock() + defer m.mu.Unlock() + m.items = append(m.items, item[T]{ + value: t, + expiry: time.Now().Add(ttl), + }) +} + +func (m *Slice[T]) List() []T { + var tt []T + m.mu.Lock() + defer m.mu.Unlock() + for _, t := range m.items { + if t.isExpired() { + continue + } + tt = append(tt, t.value) + } + return tt +}