Skip to content

Commit

Permalink
feat: store previous response codes, calculate error rate, consider (#12
Browse files Browse the repository at this point in the history
)

* feat: store previous response codes, calculate error rate, consider

- theres a new `ttlslice` internal package for storing things in a slice
  with a ttl
- these are used to store success status codes and error status codes
- these are then used to check if a node is healthy or not
- it also shows the error rate in the index page
- there's also a small fix regarding the handling of the `/rpc` trailing
  bit

* test: status

* test: error codes

Signed-off-by: Carlos Alexandro Becker <[email protected]>

* test: timeout

Signed-off-by: Carlos Alexandro Becker <[email protected]>

---------

Signed-off-by: Carlos Alexandro Becker <[email protected]>
  • Loading branch information
caarlos0 authored Aug 15, 2024
1 parent 1552f6f commit 6e56229
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 28 deletions.
2 changes: 2 additions & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates.
- `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID.
- `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_THRESHOLD` (default: `30`) - Percentage of request errors deemed acceptable.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_BUCKET_TIMEOUT` (default: `1m`) - How long in the past requests are considered to check for status codes.
- `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request.
- `AKASH_PROXY_UNHEALTHY_SERVER_RECOVERY_CHANCE_PERCENT` (default: `1`) - How much chance (in %, 0-100), a node marked as unhealthy have to get a
request again and recover.
Expand Down
2 changes: 2 additions & 0 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ <h1>Akash Proxy</h1>
<th>URL</th>
<th>Average response time</th>
<th>Request Count</th>
<th>Error Rate</th>
<th>Status</th>
</tr>
</thead>
Expand All @@ -24,6 +25,7 @@ <h1>Akash Proxy</h1>
<th><a href="{{.URL}}">{{.URL}}</a></th>
<th>{{.Avg}}</th>
<th>{{.Requests}}</th>
<th>{{.ErrorRate}}%</th>
<th>
<!-- prettier-ignore -->
{{ if not .Initialized}}
Expand Down
8 changes: 7 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/caarlos0/env/v11"
)

//go:generate go run github.com/g4s8/envdoc@latest -output ../../config.md -env-prefix AKASH_PROXY_ -type Config
//go:generate go run github.com/g4s8/envdoc@latest -output ../../config.md -env-prefix AKASH_PROXY_ -types Config
type Config struct {
// Address to listen to.
Listen string `env:"LISTEN" envDefault:":https"`
Expand Down Expand Up @@ -35,6 +35,12 @@ type Config struct {
// How slow on average a node needs to be to be marked as unhealthy.
HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"`

// Percentage of request errors deemed acceptable.
HealthyErrorRateThreshold float64 `env:"HEALTHY_ERROR_RATE_THRESHOLD" envDefault:"30"`

// How long in the past requests are considered to check for status codes.
HealthyErrorRateBucketTimeout time.Duration `env:"HEALTHY_ERROR_RATE_BUCKET_TIMEOUT" envDefault:"1m"`

// Request timeout for a proxied request.
ProxyRequestTimeout time.Duration `env:"PROXY_REQUEST_TIMEOUT" envDefault:"15s"`

Expand Down
8 changes: 3 additions & 5 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -48,6 +47,7 @@ func (p *Proxy) Stats() []ServerStat {
Degraded: !s.Healthy(),
Initialized: reqCount > 0,
Requests: reqCount,
ErrorRate: s.ErrorRate(),
})
}
sort.Sort(serverStats(result))
Expand All @@ -61,7 +61,6 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rpc")
if srv := p.next(); srv != nil {
srv.ServeHTTP(w, r)
return
Expand All @@ -80,7 +79,7 @@ func (p *Proxy) next() *Server {
server := p.servers[p.round%len(p.servers)]
p.round++
p.mu.Unlock()
if server.Healthy() {
if server.Healthy() && server.ErrorRate() <= p.cfg.HealthyErrorRateThreshold {
return server
}
if rand.Intn(99)+1 < p.cfg.UnhealthyServerRecoverChancePct {
Expand All @@ -102,8 +101,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error {
srv, err := newServer(
rpc.Provider,
rpc.Address,
p.cfg.HealthyThreshold,
p.cfg.ProxyRequestTimeout,
p.cfg,
)
if err != nil {
return err
Expand Down
29 changes: 23 additions & 6 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import (
func TestProxy(t *testing.T) {
const chainID = "unittest"
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "srv1 replied")
_, _ = io.WriteString(w, "srv1 replied")
}))
t.Cleanup(srv1.Close)
srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Millisecond * 500)
io.WriteString(w, "srv2 replied")
_, _ = io.WriteString(w, "srv2 replied")
}))
t.Cleanup(srv2.Close)
srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)
}))
t.Cleanup(srv2.Close)

Expand All @@ -40,14 +44,17 @@ func TestProxy(t *testing.T) {
Address: srv2.URL,
Provider: "srv2",
},
{
Address: srv3.URL,
Provider: "srv3",
},
},
},
}

t.Logf("%+v", seed)

seedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Log("AQUI")
bts, _ := json.Marshal(seed)
_, _ = w.Write(bts)
}))
Expand All @@ -60,13 +67,15 @@ func TestProxy(t *testing.T) {
HealthyThreshold: 10 * time.Millisecond,
ProxyRequestTimeout: time.Second,
UnhealthyServerRecoverChancePct: 1,
HealthyErrorRateThreshold: 10,
HealthyErrorRateBucketTimeout: time.Second * 10,
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
proxy.Start(ctx)

require.Len(t, proxy.servers, 2)
require.Len(t, proxy.servers, 3)

proxySrv := httptest.NewServer(proxy)
t.Cleanup(proxySrv.Close)
Expand All @@ -85,7 +94,8 @@ func TestProxy(t *testing.T) {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
// only two status codes accepted
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusTeapot {
bts, _ := io.ReadAll(resp.Body)
return fmt.Errorf("bad status code: %v: %s", resp.StatusCode, string(bts))
}
Expand All @@ -98,18 +108,25 @@ func TestProxy(t *testing.T) {
cancel()

stats := proxy.Stats()
require.Len(t, stats, 2)
require.Len(t, stats, 3)

var srv1Stats ServerStat
var srv2Stats ServerStat
var srv3Stats ServerStat
for _, st := range stats {
if st.Name == "srv1" {
srv1Stats = st
}
if st.Name == "srv2" {
srv2Stats = st
}
if st.Name == "srv3" {
srv3Stats = st
}
}
require.Zero(t, srv1Stats.ErrorRate)
require.Zero(t, srv2Stats.ErrorRate)
require.Equal(t, float64(100), srv3Stats.ErrorRate)
require.Greater(t, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(t, srv2Stats.Avg, srv1Stats.Avg)
require.False(t, srv1Stats.Degraded)
Expand Down
57 changes: 42 additions & 15 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,71 @@ import (
"log/slog"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"

"github.com/akash-network/rpc-proxy/internal/avg"
"github.com/akash-network/rpc-proxy/internal/config"
"github.com/akash-network/rpc-proxy/internal/ttlslice"
)

func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration) (*Server, error) {
func newServer(name, addr string, cfg config.Config) (*Server, error) {
target, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("could not create new server: %w", err)
}
return &Server{
name: name,
url: target,
pings: avg.Moving(50),
healthyThreshold: healthyThreshold,
requestTimeout: requestTimeout,
name: name,
url: target,
pings: avg.Moving(50),
cfg: cfg,
successes: ttlslice.New[int](),
failures: ttlslice.New[int](),
}, nil
}

type Server struct {
name string
url *url.URL
pings *avg.MovingAverage
cfg config.Config
name string
url *url.URL
pings *avg.MovingAverage
successes *ttlslice.Slice[int]
failures *ttlslice.Slice[int]
requestCount atomic.Int64
}

requestCount atomic.Int64
healthyThreshold time.Duration
requestTimeout time.Duration
func (s *Server) ErrorRate() float64 {
suss := len(s.successes.List())
fail := len(s.failures.List())
total := suss + fail
if total == 0 {
return 0
}
return (float64(fail) * 100) / float64(total)
}

func (s *Server) Healthy() bool {
return s.pings.Last() < s.healthyThreshold
return s.pings.Last() < s.cfg.HealthyThreshold
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var status int
start := time.Now()
defer func() {
d := time.Since(start)
avg := s.pings.Next(d)
slog.Info("request done", "name", s.name, "avg", avg, "last", d)
slog.Info("request done", "name", s.name, "avg", avg, "last", d, "status", status)
}()

proxiedURL := r.URL
proxiedURL.Host = s.url.Host
proxiedURL.Scheme = s.url.Scheme

if !strings.HasSuffix(s.url.Path, "/rpc") {
proxiedURL.Path = strings.TrimSuffix(proxiedURL.Path, "/rpc")
}

slog.Info("proxying request", "name", s.name, "url", proxiedURL)

rr := &http.Request{
Expand All @@ -63,10 +83,11 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Close: r.Close,
}

ctx, cancel := context.WithTimeout(r.Context(), s.requestTimeout)
ctx, cancel := context.WithTimeout(r.Context(), s.cfg.ProxyRequestTimeout)
defer cancel()

resp, err := http.DefaultClient.Do(rr.WithContext(ctx))
status = resp.StatusCode
if err == nil {
defer resp.Body.Close()
for k, v := range resp.Header {
Expand All @@ -81,6 +102,12 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

s.requestCount.Add(1)
if resp.StatusCode >= 200 && resp.StatusCode <= 300 {
s.successes.Append(resp.StatusCode, s.cfg.HealthyErrorRateBucketTimeout)
} else {
s.failures.Append(resp.StatusCode, s.cfg.HealthyErrorRateBucketTimeout)
}

if !s.Healthy() && ctx.Err() == nil && err == nil {
// if it's not healthy, this is a tryout to improve - if the request
// wasn't canceled, reset stats
Expand Down
7 changes: 7 additions & 0 deletions internal/proxy/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type ServerStat struct {
Degraded bool
Initialized bool
Requests int64
ErrorRate float64
}

type serverStats []ServerStat
Expand All @@ -30,5 +31,11 @@ func (st serverStats) Less(i, j int) bool {
if sj.Degraded && !si.Degraded {
return true
}
if si.ErrorRate > sj.ErrorRate {
return false
}
if sj.ErrorRate > si.ErrorRate {
return true
}
return si.Avg < sj.Avg
}
10 changes: 9 additions & 1 deletion internal/proxy/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func TestCompareServerStats(t *testing.T) {
Avg: time.Second,
Degraded: false,
Initialized: true,
ErrorRate: 10,
},
{
Name: "2",
Expand All @@ -46,9 +47,16 @@ func TestCompareServerStats(t *testing.T) {
Avg: 0,
Degraded: true,
Initialized: true,
ErrorRate: 15,
},
}
t.Log(names(v))
sort.Sort(serverStats(v))
require.Equal(t, []string{"4", "1", "5", "2", "3"}, names(v))
require.Equal(t, []string{
"4",
"1",
"2",
"5",
"3",
}, names(v))
}
Loading

0 comments on commit 6e56229

Please sign in to comment.