Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: node health check #11

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- `AKASH_PROXY_SEED_REFRESH_INTERVAL` (default: `5m`) - How frequently fetch SEED_URL for updates.
- `AKASH_PROXY_CHAIN_ID` (default: `akashnet-2`) - Expected chain ID.
- `AKASH_PROXY_HEALTHY_THRESHOLD` (default: `10s`) - How slow on average a node needs to be to be marked as unhealthy.
- `AKASH_PROXY_HEALTH_INTERVAL` (default: `5m`) - Check Health on endpoints.
- `AKASH_PROXY_PROXY_REQUEST_TIMEOUT` (default: `15s`) - Request timeout for a proxied request.
- `AKASH_PROXY_UNHEALTHY_SERVER_RECOVERY_CHANCE_PERCENT` (default: `1`) - How much chance (in %, 0-100), a node marked as unhealthy have to get a
request again and recover.
Expand Down
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Config struct {
// How slow on average a node needs to be to be marked as unhealthy.
HealthyThreshold time.Duration `env:"HEALTHY_THRESHOLD" envDefault:"10s"`

// Check Health on endpoints.
CheckHealthInterval time.Duration `env:"HEALTH_INTERVAL" envDefault:"5m"`

// Request timeout for a proxied request.
ProxyRequestTimeout time.Duration `env:"PROXY_REQUEST_TIMEOUT" envDefault:"15s"`

Expand Down
86 changes: 85 additions & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"math/rand"
"net/http"
Expand All @@ -22,6 +25,49 @@ func New(cfg config.Config) *Proxy {
}
}

type StatusResponse struct {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
Jsonrpc string `json:"jsonrpc"`
ID int `json:"id"`
Result struct {
NodeInfo struct {
ProtocolVersion struct {
P2P string `json:"p2p"`
Block string `json:"block"`
App string `json:"app"`
} `json:"protocol_version"`
ID string `json:"id"`
ListenAddr string `json:"listen_addr"`
Network string `json:"network"`
Version string `json:"version"`
Channels string `json:"channels"`
Moniker string `json:"moniker"`
Other struct {
TxIndex string `json:"tx_index"`
RPCAddress string `json:"rpc_address"`
} `json:"other"`
} `json:"node_info"`
SyncInfo struct {
LatestBlockHash string `json:"latest_block_hash"`
LatestAppHash string `json:"latest_app_hash"`
LatestBlockHeight string `json:"latest_block_height"`
LatestBlockTime time.Time `json:"latest_block_time"`
EarliestBlockHash string `json:"earliest_block_hash"`
EarliestAppHash string `json:"earliest_app_hash"`
EarliestBlockHeight string `json:"earliest_block_height"`
EarliestBlockTime time.Time `json:"earliest_block_time"`
CatchingUp bool `json:"catching_up"`
} `json:"sync_info"`
ValidatorInfo struct {
Address string `json:"address"`
PubKey struct {
Type string `json:"type"`
Value string `json:"value"`
} `json:"pub_key"`
VotingPower string `json:"voting_power"`
} `json:"validator_info"`
} `json:"result"`
}

type Proxy struct {
cfg config.Config
init sync.Once
Expand Down Expand Up @@ -71,6 +117,43 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}

func checkSingleRPC(url string) error {
req, err := http.NewRequest("GET", url+"/status", nil)
if err != nil {
return fmt.Errorf("error creating request: %v", err)
}
client := &http.Client{Timeout: 2 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error making request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %v", err)
}

var status StatusResponse
if err := json.Unmarshal(body, &status); err != nil {
return fmt.Errorf("error unmarshaling JSON: %v", err)
}

if status.Result.SyncInfo.CatchingUp {
return fmt.Errorf("node is still catching up")
}

if !status.Result.SyncInfo.LatestBlockTime.After(time.Now().Add(-time.Minute)) {
return fmt.Errorf("latest block time is more than 1 minute old")
}

return nil
}

func (p *Proxy) next() *Server {
p.mu.Lock()
if len(p.servers) == 0 {
Expand Down Expand Up @@ -104,6 +187,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error {
rpc.Address,
p.cfg.HealthyThreshold,
p.cfg.ProxyRequestTimeout,
p.cfg.CheckHealthInterval,
)
if err != nil {
return err
Expand All @@ -115,7 +199,7 @@ func (p *Proxy) update(rpcs []seed.RPC) error {
// remove deleted servers
p.servers = slices.DeleteFunc(p.servers, func(srv *Server) bool {
for _, rpc := range rpcs {
if rpc.Provider == srv.name {
if rpc.Provider == srv.name && srv.healthy.Load() {
return false
}
}
Expand Down
41 changes: 34 additions & 7 deletions internal/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,59 @@ import (
"github.com/akash-network/rpc-proxy/internal/avg"
)

func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration) (*Server, error) {
func newServer(name, addr string, healthyThreshold, requestTimeout time.Duration, healthInterval time.Duration) (*Server, error) {
target, err := url.Parse(addr)
if err != nil {
return nil, fmt.Errorf("could not create new server: %w", err)
}
return &Server{

server := &Server{
name: name,
url: target,
pings: avg.Moving(50),
healthyThreshold: healthyThreshold,
requestTimeout: requestTimeout,
}, nil
lastHealthCheck: time.Now().UTC(),
healthInterval: healthInterval,
healthy: atomic.Bool{},
}

err = checkSingleRPC(addr)
server.healthy.Store(err == nil)

return server, nil
}

type Server struct {
name string
url *url.URL
pings *avg.MovingAverage
name string
url *url.URL
pings *avg.MovingAverage
lastHealthCheck time.Time
healthy atomic.Bool

requestCount atomic.Int64
healthInterval time.Duration
healthyThreshold time.Duration
requestTimeout time.Duration
}

func (s *Server) Healthy() bool {
nick134-bit marked this conversation as resolved.
Show resolved Hide resolved
return s.pings.Last() < s.healthyThreshold
now := time.Now().UTC()
if now.Sub(s.lastHealthCheck) >= s.healthInterval {
slog.Info("checking health", "name", s.name)
err := checkSingleRPC(s.url.String())
healthy := err == nil
s.healthy.Store(healthy)
s.lastHealthCheck = now

if healthy {
slog.Info("server is healthy", "name", s.name)
} else {
slog.Error("server is unhealthy", "name", s.name, "err", err)
}
}

return s.pings.Last() < s.healthyThreshold && s.healthy.Load()
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down