Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: node health check #11

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates.
- `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID.
- `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy.
- `AKASH_PROXY_HEALTH_INTERVAL` (default: `5m`) - Check Health on endpoints.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_THRESHOLD` (default: `30`) - Percentage of request errors deemed acceptable.
- `AKASH_PROXY_HEALTHY_ERROR_RATE_BUCKET_TIMEOUT` (default: `1m`) - How long in the past requests are considered to check for status codes.
- `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request.
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Config struct {
// How slow on average a node needs to be to be marked as unhealthy.
HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"`

// Check Health on endpoints.
CheckHealthInterval time.Duration `env:"HEALTH_INTERVAL" envDefault:"5m"`

// Percentage of request errors deemed acceptable.
HealthyErrorRateThreshold float64 `env:"HEALTHY_ERROR_RATE_THRESHOLD" envDefault:"30"`

Expand Down
102 changes: 101 additions & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"net/http"
Expand All @@ -10,6 +13,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/akash-network/rpc-proxy/internal/config"
"github.com/akash-network/rpc-proxy/internal/seed"
Expand All @@ -34,6 +38,30 @@ func New(
}
}

type StatusResponse struct {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
Jsonrpc string `json:"jsonrpc"`
Result struct {
NodeInfo struct {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
ID string `json:"id"`
Network string `json:"network"`
Version string `json:"version"`
} `json:"node_info"`
SyncInfo struct {
LatestBlockTime time.Time `json:"latest_block_time"`
CatchingUp bool `json:"catching_up"`
} `json:"sync_info"`
} `json:"result"`
}

type RestStatusResponse struct {
Block struct {
Header struct {
ChainID string `json:"chain_id"`
Height string `json:"height"`
Time time.Time `json:"time"`
} `json:"header"`
} `json:"block"`
}
type Proxy struct {
cfg config.Config
kind ProxyKind
Expand Down Expand Up @@ -91,6 +119,77 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}

func checkEndpoint(url string, kind ProxyKind) error {
switch kind {
case RPC:
return checkRPC(url)
case Rest:
return checkREST(url)
default:
return fmt.Errorf("unsupported proxy kind: %v", kind)
}
}

func performGetRequest(url string, timeout time.Duration) ([]byte, error) {
client := &http.Client{Timeout: timeout}
resp, err := client.Get(url)
if err != nil {
return nil, fmt.Errorf("error making request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %v", err)
}

return body, nil
}

func checkRPC(url string) error {
body, err := performGetRequest(url+"/status", 2*time.Second)
if err != nil {
return err
}

var status StatusResponse
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("error unmarshaling JSON: %v", err)
}

if status.Result.SyncInfo.CatchingUp {
return fmt.Errorf("node is still catching up")
}

if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().UTC().Add(-time.Minute)) {
return fmt.Errorf("latest block time is more than 1 minute old")
}

return nil
}

func checkREST(url string) error {
body, err := performGetRequest(url+"/blocks/latest", 2*time.Second)
if err != nil {
return err
}

var status RestStatusResponse
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("error unmarshaling JSON: %v", err)
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
}

if !status.Block.Header.Time.After(time.Now().UTC().Add(-time.Minute)) {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("latest block time is more than 1 minute old")
}

return nil
}

func (p *Proxy) next() *Server {
p.mu.Lock()
if len(p.servers) == 0 {
Expand Down Expand Up @@ -136,6 +235,7 @@ func (p *Proxy) doUpdate(providers []seed.Provider) error {
provider.Provider,
provider.Address,
p.cfg,
p.kind,
)
if err != nil {
return err
Expand All @@ -147,7 +247,7 @@ func (p *Proxy) doUpdate(providers []seed.Provider) error {
// remove deleted servers
p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool {
for _, provider := range providers {
if provider.Provider == srv.name {
if provider.Provider == srv.name && srv.healthy.Load() {
return false
}
}
Expand Down
70 changes: 48 additions & 22 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,63 @@ import (
"github.com/akash-network/rpc-proxy/internal/ttlslice"
)

func newServer(name, addr string, cfg config.Config) (*Server, error) {
func newServer(name, addr string, cfg config.Config, kind ProxyKind) (*Server, error) {
target, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("could not create new server: %w", err)
}
return &Server{
name: name,
url: target,
pings: avg.Moving(50),
cfg: cfg,
successes: ttlslice.New[int](),
failures: ttlslice.New[int](),
}, nil

server := &Server{
name: name,
url: target,
pings: avg.Moving(50),
cfg: cfg,
successes: ttlslice.New[int](),
failures: ttlslice.New[int](),
lastHealthCheck: time.Now().UTC(),
healthy: atomic.Bool{},
kind: kind,
}

err = checkEndpoint(addr, kind)
server.healthy.Store(err == nil)

return server, nil
}

type Server struct {
cfg config.Config
name string
url *url.URL
pings *avg.MovingAverage
successes *ttlslice.Slice[int]
failures *ttlslice.Slice[int]
requestCount atomic.Int64
name string
url *url.URL
kind ProxyKind
pings *avg.MovingAverage
lastHealthCheck time.Time
healthy atomic.Bool

requestCount atomic.Int64
cfg config.Config
successes *ttlslice.Slice[int]
failures *ttlslice.Slice[int]
}

func (s *Server) Healthy() bool {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
now := time.Now().UTC()
//Add different config value if wanted
if now.Sub(s.lastHealthCheck) >= s.cfg.CheckHealthInterval {
slog.Info("checking health", "name", s.name)
err := checkEndpoint(s.url.String(), s.kind)
healthy := err == nil
s.healthy.Store(healthy)
s.lastHealthCheck = now

if healthy {
slog.Info("server is healthy", "name", s.name)
} else {
slog.Error("server is unhealthy", "name", s.name, "err", err)
}
}

return s.pings.Last() < s.cfg.HealthyThreshold && s.healthy.Load()
}
func (s *Server) ErrorRate() float64 {
suss := len(s.successes.List())
fail := len(s.failures.List())
Expand All @@ -49,12 +81,6 @@ func (s *Server) ErrorRate() float64 {
}
return (float64(fail) * 100) / float64(total)
}

func (s *Server) Healthy() bool {
return s.pings.Last() < s.cfg.HealthyThreshold &&
s.ErrorRate() < s.cfg.HealthyErrorRateThreshold
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var status int = -1
start := time.Now()
Expand Down
Loading