Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: rest proxy #13

Merged
merged 7 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.pem

dist/
akash
44 changes: 23 additions & 21 deletions index.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,35 @@ <h1>Akash Proxy</h1>
<table>
<thead>
<tr>
<th>Name</th>
<th>URL</th>
<th>Average response time</th>
<th>Server</th>
<th>Request Count</th>
<th>Avg response time</th>
<th>Error Rate</th>
<th>Status</th>
<th>Kind</th>
</tr>
</thead>
<!-- prettier-ignore -->
<tbody>
{{ range .}}
<tr>
<th>{{.Name}}</th>
<th><a href="{{.URL}}">{{.URL}}</a></th>
<th>{{.Avg}}</th>
<th>{{.Requests}}</th>
<th>{{.ErrorRate}}%</th>
<th>
<!-- prettier-ignore -->
{{ if not .Initialized}}
initializing
{{ else if .Degraded }}
degraded
{{else}}
OK
{{end}}
</th>
</tr>
{{ range $key, $value := . }}
{{ range $value }}
<tr>
<th><a href="{{ .URL }}">{{ .Name }}</a></th>
<th>{{ .Requests }}</th>
<th>{{ .Avg }}</th>
<th>{{ .ErrorRate }}%</th>
<th>
{{ if not .Initialized }}
initializing
{{ else if .Degraded }}
degraded
{{ else }}
OK
{{ end }}
</th>
<th>{{ $key }}</th>
</tr>
{{ end }}
{{ end }}
</tbody>
</table>
Expand Down
78 changes: 47 additions & 31 deletions internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,38 @@ import (
"net/http"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/akash-network/rpc-proxy/internal/config"
"github.com/akash-network/rpc-proxy/internal/seed"
)

func New(cfg config.Config) *Proxy {
type ProxyKind uint8

const (
RPC ProxyKind = iota
Rest ProxyKind = iota
)

func New(
kind ProxyKind,
ch chan seed.Seed,
cfg config.Config,
) *Proxy {
return &Proxy{
cfg: cfg,
cfg: cfg,
ch: ch,
kind: kind,
}
}

type Proxy struct {
cfg config.Config
kind ProxyKind
init sync.Once
ch chan seed.Seed

round int
mu sync.Mutex
Expand Down Expand Up @@ -61,10 +76,16 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

switch p.kind {
case RPC:
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rpc")
case Rest:
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/rest")
}

if srv := p.next(); srv != nil {
srv.ServeHTTP(w, r)
return

}
slog.Error("no servers available")
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -90,17 +111,30 @@ func (p *Proxy) next() *Server {
return p.next()
}

func (p *Proxy) update(rpcs []seed.RPC) error {
func (p *Proxy) update(seed seed.Seed) {
var err error
switch p.kind {
case RPC:
err = p.doUpdate(seed.APIs.RPC)
case Rest:
err = p.doUpdate(seed.APIs.Rest)
}
if err != nil {
slog.Error("could not update seed", "err", err)
}
}

func (p *Proxy) doUpdate(providers []seed.Provider) error {
p.mu.Lock()
defer p.mu.Unlock()

// add new servers
for _, rpc := range rpcs {
idx := slices.IndexFunc(p.servers, func(srv *Server) bool { return srv.name == rpc.Provider })
for _, provider := range providers {
idx := slices.IndexFunc(p.servers, func(srv *Server) bool { return srv.name == provider.Provider })
if idx == -1 {
srv, err := newServer(
rpc.Provider,
rpc.Address,
provider.Provider,
provider.Address,
p.cfg,
)
if err != nil {
Expand All @@ -112,8 +146,8 @@ func (p *Proxy) update(rpcs []seed.RPC) error {

// remove deleted servers
p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool {
for _, rpc := range rpcs {
if rpc.Provider == srv.name {
for _, provider := range providers {
if provider.Provider == srv.name {
return false
}
}
Expand All @@ -129,33 +163,15 @@ func (p *Proxy) update(rpcs []seed.RPC) error {
func (p *Proxy) Start(ctx context.Context) {
p.init.Do(func() {
go func() {
t := time.NewTicker(p.cfg.SeedRefreshInterval)
defer t.Stop()
for {
select {
case <-t.C:
p.fetchAndUpdate()
case seed := <-p.ch:
p.update(seed)
case <-ctx.Done():
p.shuttingDown.Store(true)
return
}
}
}()
p.fetchAndUpdate()
})
}

func (p *Proxy) fetchAndUpdate() {
result, err := seed.Fetch(p.cfg.SeedURL)
if err != nil {
slog.Error("could not get initial seed list", "err", err)
return
}
if result.ChainID != p.cfg.ChainID {
slog.Error("chain ID is different than expected", "got", result.ChainID, "expected", p.cfg.ChainID)
return
}
if err := p.update(result.Apis.RPC); err != nil {
slog.Error("could not update servers", "err", err)
}
}
107 changes: 55 additions & 52 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand All @@ -17,53 +16,33 @@ import (
)

func TestProxy(t *testing.T) {
const chainID = "unittest"
for name, kind := range map[string]ProxyKind{
"rpc": RPC,
"rest": Rest,
} {
t.Run(name, func(t *testing.T) {
testProxy(t, kind)
})
}
}

func testProxy(tb testing.TB, kind ProxyKind) {
srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.WriteString(w, "srv1 replied")
}))
t.Cleanup(srv1.Close)
tb.Cleanup(srv1.Close)
srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Millisecond * 500)
_, _ = io.WriteString(w, "srv2 replied")
}))
t.Cleanup(srv2.Close)
tb.Cleanup(srv2.Close)
srv3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTeapot)
}))
t.Cleanup(srv2.Close)

seed := seed.Seed{
ChainID: chainID,
Apis: seed.Apis{
RPC: []seed.RPC{
{
Address: srv1.URL,
Provider: "srv1",
},
{
Address: srv2.URL,
Provider: "srv2",
},
{
Address: srv3.URL,
Provider: "srv3",
},
},
},
}

t.Logf("%+v", seed)

seedSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bts, _ := json.Marshal(seed)
_, _ = w.Write(bts)
}))
t.Cleanup(seedSrv.Close)
tb.Cleanup(srv2.Close)

proxy := New(config.Config{
SeedURL: seedSrv.URL,
SeedRefreshInterval: 500 * time.Millisecond,
ChainID: chainID,
ch := make(chan seed.Seed, 1)
proxy := New(kind, ch, config.Config{
HealthyThreshold: 10 * time.Millisecond,
ProxyRequestTimeout: time.Second,
UnhealthyServerRecoverChancePct: 1,
Expand All @@ -72,19 +51,43 @@ func TestProxy(t *testing.T) {
})

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
tb.Cleanup(cancel)
proxy.Start(ctx)

require.Len(t, proxy.servers, 3)
serverList := []seed.Provider{
{
Address: srv1.URL,
Provider: "srv1",
},
{
Address: srv2.URL,
Provider: "srv2",
},
{
Address: srv3.URL,
Provider: "srv3",
},
}

ch <- seed.Seed{
APIs: seed.Apis{
Rest: serverList,
RPC: serverList,
},
}

require.Eventually(tb, func() bool { return proxy.initialized.Load() }, time.Second, time.Millisecond)

require.Len(tb, proxy.servers, 3)

proxySrv := httptest.NewServer(proxy)
t.Cleanup(proxySrv.Close)
tb.Cleanup(proxySrv.Close)

var wg errgroup.Group
wg.SetLimit(20)
for i := 0; i < 100; i++ {
wg.Go(func() error {
t.Log("go")
tb.Log("go")
req, err := http.NewRequest(http.MethodGet, proxySrv.URL, nil)
if err != nil {
return err
Expand All @@ -102,13 +105,13 @@ func TestProxy(t *testing.T) {
return nil
})
}
require.NoError(t, wg.Wait())
require.NoError(tb, wg.Wait())

// stop the proxy
cancel()

stats := proxy.Stats()
require.Len(t, stats, 3)
require.Len(tb, stats, 3)

var srv1Stats ServerStat
var srv2Stats ServerStat
Expand All @@ -124,13 +127,13 @@ func TestProxy(t *testing.T) {
srv3Stats = st
}
}
require.Zero(t, srv1Stats.ErrorRate)
require.Zero(t, srv2Stats.ErrorRate)
require.Equal(t, float64(100), srv3Stats.ErrorRate)
require.Greater(t, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(t, srv2Stats.Avg, srv1Stats.Avg)
require.False(t, srv1Stats.Degraded)
require.True(t, srv2Stats.Degraded)
require.True(t, srv1Stats.Initialized)
require.True(t, srv2Stats.Initialized)
require.Zero(tb, srv1Stats.ErrorRate)
require.Zero(tb, srv2Stats.ErrorRate)
require.Equal(tb, float64(100), srv3Stats.ErrorRate)
require.Greater(tb, srv1Stats.Requests, srv2Stats.Requests)
require.Greater(tb, srv2Stats.Avg, srv1Stats.Avg)
require.False(tb, srv1Stats.Degraded)
require.True(tb, srv2Stats.Degraded)
require.True(tb, srv1Stats.Initialized)
require.True(tb, srv2Stats.Initialized)
}
Loading
Loading