Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: additional prom metrics #738

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L
return ch.getClient()
}
ch.txm = txm.NewTxm(ch.id, tc, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, ch.Reader)
bc := func() (monitor.BalanceClient, error) {
return ch.getClient()
}
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

const (
Expand Down Expand Up @@ -51,6 +52,7 @@ type Writer interface {
var _ ReaderWriter = (*Client)(nil)

type Client struct {
url string
rpc *rpc.Client
skipPreflight bool // to enable or disable preflight checks
commitment rpc.CommitmentType
Expand All @@ -65,6 +67,7 @@ type Client struct {

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
url: endpoint,
rpc: rpc.New(endpoint),
skipPreflight: cfg.SkipPreflight(),
commitment: cfg.Commitment(),
Expand All @@ -76,7 +79,17 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration,
}, nil
}

func (c *Client) latency(name string) func() {
start := time.Now()
return func() {
monitor.SetClientLatency(time.Since(start), name, c.url)
}
}

func (c *Client) Balance(addr solana.PublicKey) (uint64, error) {
done := c.latency("balance")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()

Expand All @@ -95,6 +108,9 @@ func (c *Client) SlotHeight() (uint64, error) {
}

func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64, error) {
done := c.latency("slot_height")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetSlotHeight", func() (interface{}, error) {
Expand All @@ -104,13 +120,19 @@ func (c *Client) SlotHeightWithCommitment(commitment rpc.CommitmentType) (uint64
}

func (c *Client) GetAccountInfoWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetAccountInfoOpts) (*rpc.GetAccountInfoResult, error) {
done := c.latency("account_info")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()
opts.Commitment = c.commitment // overrides passed in value - use defined client commitment type
return c.rpc.GetAccountInfoWithOpts(ctx, addr, opts)
}

func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
done := c.latency("latest_blockhash")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()

Expand All @@ -121,6 +143,9 @@ func (c *Client) LatestBlockhash() (*rpc.GetLatestBlockhashResult, error) {
}

func (c *Client) ChainID() (string, error) {
done := c.latency("chain_id")
defer done()

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
defer cancel()
v, err, _ := c.requestGroup.Do("GetGenesisHash", func() (interface{}, error) {
Expand All @@ -147,6 +172,9 @@ func (c *Client) ChainID() (string, error) {
}

func (c *Client) GetFeeForMessage(msg string) (uint64, error) {
done := c.latency("fee_for_message")
defer done()

// msg is base58 encoded data

ctx, cancel := context.WithTimeout(context.Background(), c.contextDuration)
Expand All @@ -164,6 +192,9 @@ func (c *Client) GetFeeForMessage(msg string) (uint64, error) {

// https://docs.solana.com/developing/clients/jsonrpc-api#getsignaturestatuses
func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature) ([]*rpc.SignatureStatusesResult, error) {
done := c.latency("signature_statuses")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

Expand All @@ -182,6 +213,9 @@ func (c *Client) SignatureStatuses(ctx context.Context, sigs []solana.Signature)
// https://docs.solana.com/developing/clients/jsonrpc-api#simulatetransaction
// opts - (optional) use `nil` to use defaults
func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *rpc.SimulateTransactionOpts) (*rpc.SimulateTransactionResult, error) {
done := c.latency("simulate_tx")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.contextDuration)
defer cancel()

Expand All @@ -205,6 +239,9 @@ func (c *Client) SimulateTx(ctx context.Context, tx *solana.Transaction, opts *r
}

func (c *Client) SendTx(ctx context.Context, tx *solana.Transaction) (solana.Signature, error) {
done := c.latency("send_tx")
defer done()

ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()

Expand All @@ -225,6 +262,8 @@ func (c *Client) GetLatestBlock() (*rpc.GetBlockResult, error) {
}

// get block based on slot
done := c.latency("latest_block")
defer done()
ctx, cancel := context.WithTimeout(context.Background(), c.txTimeout)
defer cancel()
v, err, _ := c.requestGroup.Do("GetBlockWithOpts", func() (interface{}, error) {
Expand Down
22 changes: 22 additions & 0 deletions pkg/solana/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import (
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/system"
"github.com/gagliardetto/solana-go/rpc"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

func TestClient_Reader_Integration(t *testing.T) {
Expand Down Expand Up @@ -292,3 +295,22 @@ func TestClient_SendTxDuplicates_Integration(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(5_000), initBal-endBal)
}

func TestClientLatency(t *testing.T) {
c := Client{}
v := 100
n := t.Name() + uuid.NewString()
f := func() {
done := c.latency(n)
defer done()
time.Sleep(time.Duration(v) * time.Millisecond)
}
f()
g, err := monitor.GetClientLatency(n, c.url)
require.NoError(t, err)
val := testutil.ToFloat64(g)

// check within expected range
assert.GreaterOrEqual(t, val, float64(v))
assert.LessOrEqual(t, val, float64(v)*1.05)
}
16 changes: 9 additions & 7 deletions pkg/solana/monitor/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

solanaClient "github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
)

// Config defines the monitor configuration.
Expand All @@ -23,12 +21,16 @@ type Keystore interface {
Accounts(ctx context.Context) ([]string, error)
}

type BalanceClient interface {
Balance(addr solana.PublicKey) (uint64, error)
}

// NewBalanceMonitor returns a balance monitoring services.Service which reports the SOL balance of all ks keys to prometheus.
func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) services.Service {
func NewBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) services.Service {
return newBalanceMonitor(chainID, cfg, lggr, ks, newReader)
}

func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (solanaClient.Reader, error)) *balanceMonitor {
func newBalanceMonitor(chainID string, cfg Config, lggr logger.Logger, ks Keystore, newReader func() (BalanceClient, error)) *balanceMonitor {
b := balanceMonitor{
chainID: chainID,
cfg: cfg,
Expand All @@ -48,10 +50,10 @@ type balanceMonitor struct {
cfg Config
lggr logger.Logger
ks Keystore
newReader func() (solanaClient.Reader, error)
newReader func() (BalanceClient, error)
updateFn func(acc solana.PublicKey, lamports uint64) // overridable for testing

reader solanaClient.Reader
reader BalanceClient

stop services.StopChan
done chan struct{}
Expand Down Expand Up @@ -98,7 +100,7 @@ func (b *balanceMonitor) monitor() {
}

// getReader returns the cached solanaClient.Reader, or creates a new one if nil.
func (b *balanceMonitor) getReader() (solanaClient.Reader, error) {
func (b *balanceMonitor) getReader() (BalanceClient, error) {
if b.reader == nil {
var err error
b.reader, err = b.newReader()
Expand Down
40 changes: 37 additions & 3 deletions pkg/solana/monitor/prom.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,53 @@
package monitor

import (
"time"

"github.com/gagliardetto/solana-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
)

var promSolanaBalance = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"},
[]string{"account", "chainID", "chainSet", "denomination"},
var (
promSolanaBalance = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_balance", Help: "Solana account balances"},
[]string{"account", "chainID", "chainSet", "denomination"},
)
promCacheTimestamp = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_cache_last_update_unix", Help: "Solana relayer cache last update timestamp"},
[]string{"type", "chainID", "account"},
)
promClientReq = promauto.NewGaugeVec(
prometheus.GaugeOpts{Name: "solana_client_latency_ms", Help: "Solana client request latency"},
[]string{"request", "url"},
)
)

func (b *balanceMonitor) updateProm(acc solana.PublicKey, lamports uint64) {
v := internal.LamportsToSol(lamports) // convert from lamports to SOL
promSolanaBalance.WithLabelValues(acc.String(), b.chainID, "solana", "SOL").Set(v)
}

func SetCacheTimestamp(t time.Time, cacheType, chainID, account string) {
promCacheTimestamp.With(prometheus.Labels{
"type": cacheType,
"chainID": chainID,
"account": account,
}).Set(float64(t.Unix()))
}

func SetClientLatency(d time.Duration, request, url string) {
promClientReq.With(prometheus.Labels{
"request": request,
"url": url,
}).Set(float64(d.Milliseconds()))
}

func GetClientLatency(request, url string) (prometheus.Gauge, error) {
return promClientReq.GetMetricWith(prometheus.Labels{
"request": request,
"url": url,
})
}
4 changes: 2 additions & 2 deletions pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (r *Relayer) NewMedianProvider(rargs relaytypes.RelayArgs, pargs relaytypes
}

cfg := configWatcher.chain.Config()
transmissionsCache := NewTransmissionsCache(transmissionsID, cfg, configWatcher.reader, r.lggr)
transmissionsCache := NewTransmissionsCache(transmissionsID, relayConfig.ChainID, cfg, configWatcher.reader, r.lggr)
return &medianProvider{
configProvider: configWatcher,
transmissionsCache: transmissionsCache,
Expand Down Expand Up @@ -194,7 +194,7 @@ func newConfigProvider(ctx context.Context, lggr logger.Logger, chain Chain, arg
if err != nil {
return nil, fmt.Errorf("error in NewMedianProvider.chain.Reader: %w", err)
}
stateCache := NewStateCache(stateID, chain.Config(), reader, lggr)
stateCache := NewStateCache(stateID, relayConfig.ChainID, chain.Config(), reader, lggr)
return &configProvider{
chainID: relayConfig.ChainID,
stateID: stateID,
Expand Down
9 changes: 7 additions & 2 deletions pkg/solana/state_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

var (
Expand All @@ -28,6 +29,7 @@ type StateCache struct {
services.StateMachine
// on-chain program + 2x state accounts (state + transmissions)
StateID solana.PublicKey
chainID string

stateLock sync.RWMutex
state State
Expand All @@ -43,9 +45,10 @@ type StateCache struct {
stopCh services.StopChan
}

func NewStateCache(stateID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache {
func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache {
return &StateCache{
StateID: stateID,
chainID: chainID,
reader: reader,
lggr: lggr,
cfg: cfg,
Expand Down Expand Up @@ -124,11 +127,13 @@ func (c *StateCache) fetchState(ctx context.Context) error {

c.lggr.Debugf("state fetched for account: %s, result (config digest): %v", c.StateID, hex.EncodeToString(state.Config.LatestConfigDigest[:]))

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, "ocr2_median_state", c.chainID, c.StateID.String())
// acquire lock and write to state
c.stateLock.Lock()
defer c.stateLock.Unlock()
c.state = state
c.stateTime = time.Now()
c.stateTime = timestamp
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/solana/transmissions_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (

"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
)

type TransmissionsCache struct {
services.StateMachine

// on-chain program + 2x state accounts (state + transmissions)
TransmissionsID solana.PublicKey
chainID string

ansLock sync.RWMutex
answer Answer
Expand All @@ -39,9 +41,10 @@ type TransmissionsCache struct {
stopCh services.StopChan
}

func NewTransmissionsCache(transmissionsID solana.PublicKey, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache {
func NewTransmissionsCache(transmissionsID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache {
return &TransmissionsCache{
TransmissionsID: transmissionsID,
chainID: chainID,
reader: reader,
lggr: lggr,
cfg: cfg,
Expand Down Expand Up @@ -120,11 +123,13 @@ func (c *TransmissionsCache) fetchLatestTransmission(ctx context.Context) error
}
c.lggr.Debugf("latest transmission fetched for account: %s, result: %v", c.TransmissionsID, answer)

timestamp := time.Now()
monitor.SetCacheTimestamp(timestamp, "ocr2_median_transmissions", c.chainID, c.TransmissionsID.String())
// acquire lock and write to state
c.ansLock.Lock()
defer c.ansLock.Unlock()
c.answer = answer
c.ansTime = time.Now()
c.ansTime = timestamp
return nil
}

Expand Down
Loading