Skip to content

Commit

Permalink
Merge PR :observe rpc call duration and record to prometheus (#917)
Browse files Browse the repository at this point in the history
  • Loading branch information
barryz authored Jul 7, 2021
1 parent 74eb264 commit 20e040e
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 101 deletions.
33 changes: 22 additions & 11 deletions app/rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package rpc

import (
"fmt"
"reflect"
"strings"
"unicode"

"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/server"
"github.com/ethereum/go-ethereum/rpc"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/prometheus"
evmtypes "github.com/okex/exchain/x/evm/types"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/viper"
"github.com/tendermint/tendermint/libs/log"
"golang.org/x/time/rate"
"reflect"
"strings"
"unicode"

"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/rpc/backend"
"github.com/okex/exchain/app/rpc/monitor"
"github.com/okex/exchain/app/rpc/namespaces/eth"
"github.com/okex/exchain/app/rpc/namespaces/eth/filters"
"github.com/okex/exchain/app/rpc/namespaces/net"
Expand Down Expand Up @@ -117,7 +118,7 @@ func makeMonitorMetrics(namespace string, service interface{}) {
}
metricsVal := receiver.Elem().FieldByName(MetricsFieldName)

monitorMetrics := make(map[string]metrics.Counter)
monitorMetrics := make(map[string]*monitor.RpcMetrics)
typ := receiver.Type()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
Expand All @@ -126,12 +127,22 @@ func makeMonitorMetrics(namespace string, service interface{}) {
}
methodName := formatMethodName(method.Name)
name := fmt.Sprintf("%s_%s", namespace, methodName)
monitorMetrics[name] = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystem,
Name: name,
Help: fmt.Sprintf("Number of %s method.", name),
}, nil)
monitorMetrics[name] = &monitor.RpcMetrics{
Counter: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystem,
Name: fmt.Sprintf("%s_count", name),
Help: fmt.Sprintf("Total request number of %s method.", name),
}, nil),
Histogram: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: MetricsNamespace,
Subsystem: MetricsSubsystem,
Name: fmt.Sprintf("%s_duration", name),
Help: fmt.Sprintf("Request duration of %s method.", name),
Buckets: []float64{.001, .005, .01, .025, .05, .1, .3, .5, 1, 3, 5, 10},
}, nil),
}

}

if metricsVal.CanSet() && metricsVal.Type() == reflect.ValueOf(monitorMetrics).Type() {
Expand Down
51 changes: 35 additions & 16 deletions app/rpc/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,56 @@ package monitor

import (
"fmt"
"time"

"github.com/go-kit/kit/metrics"
"github.com/tendermint/tendermint/libs/log"
"time"
)

// RpcMetrics ...
type RpcMetrics struct {
Counter metrics.Counter
Histogram metrics.Histogram
}

type Monitor struct {
method string
logger log.Logger
lastTimestamp int64
method string
logger log.Logger
lastTime time.Time
metrics map[string]*RpcMetrics
}

func GetMonitor(method string, logger log.Logger) *Monitor {
func GetMonitor(method string, logger log.Logger, metrics map[string]*RpcMetrics) *Monitor {
return &Monitor{
method: method,
logger: logger,
method: method,
logger: logger,
metrics: metrics,
}
}

func (m *Monitor) OnBegin(metrics map[string]metrics.Counter) {
m.lastTimestamp = time.Now().UnixNano()
func (m *Monitor) OnBegin() *Monitor {
m.lastTime = time.Now()

if metrics == nil {
return
if m.metrics == nil {
return m
}
if _, ok := metrics[m.method]; ok {
metrics[m.method].Add(1)

if _, ok := m.metrics[m.method]; ok {
m.metrics[m.method].Counter.Add(1)
}

return m
}

func (m *Monitor) OnEnd(args ...interface{}) {
now := time.Now().UnixNano()
unit := int64(1e6)
m.logger.Debug(fmt.Sprintf("RPC: Method<%s>, Interval<%dms>, Params<%v>", m.method, (now-m.lastTimestamp)/unit, args))
elapsed := time.Since(m.lastTime).Seconds()
m.logger.Debug(fmt.Sprintf("RPC: Method<%s>, Elapsed<%fms>, Params<%v>", m.method, elapsed*1e3, args))

if m.metrics == nil {
return
}

if _, ok := m.metrics[m.method]; ok {
m.metrics[m.method].Histogram.Observe(elapsed)
}
}
76 changes: 26 additions & 50 deletions app/rpc/namespaces/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,22 @@ import (
"sync"
"time"

"github.com/go-kit/kit/metrics"
lru "github.com/hashicorp/golang-lru"

"github.com/okex/exchain/app/rpc/monitor"
"github.com/okex/exchain/app/rpc/namespaces/eth/simulation"
"github.com/okex/exchain/x/evm/watcher"

cmserver "github.com/cosmos/cosmos-sdk/server"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/crypto"
lru "github.com/hashicorp/golang-lru"
"github.com/spf13/viper"

"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/crypto/hd"
"github.com/okex/exchain/app/rpc/backend"
"github.com/okex/exchain/app/rpc/monitor"
"github.com/okex/exchain/app/rpc/namespaces/eth/simulation"
rpctypes "github.com/okex/exchain/app/rpc/types"
ethermint "github.com/okex/exchain/app/types"
"github.com/okex/exchain/app/utils"
evmtypes "github.com/okex/exchain/x/evm/types"
"github.com/okex/exchain/x/evm/watcher"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
Expand Down Expand Up @@ -71,7 +68,7 @@ type PublicEthereumAPI struct {
watcherBackend *watcher.Watcher
evmFactory simulation.EvmFactory
txPool *TxPool
Metrics map[string]metrics.Counter
Metrics map[string]*monitor.RpcMetrics
callCache *lru.Cache
}

Expand Down Expand Up @@ -162,8 +159,7 @@ func (api *PublicEthereumAPI) SetKeys(keys []ethsecp256k1.PrivKey) {

// ProtocolVersion returns the supported Ethereum protocol version.
func (api *PublicEthereumAPI) ProtocolVersion() hexutil.Uint {
monitor := monitor.GetMonitor("eth_protocolVersion", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_protocolVersion", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd()
return hexutil.Uint(ethermint.ProtocolVersion)
}
Expand All @@ -177,8 +173,7 @@ func (api *PublicEthereumAPI) ChainId() (hexutil.Uint, error) { // nolint
// Syncing returns whether or not the current node is syncing with other peers. Returns false if not, or a struct
// outlining the state of the sync if it is.
func (api *PublicEthereumAPI) Syncing() (interface{}, error) {
monitor := monitor.GetMonitor("eth_syncing", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_syncing", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd()
status, err := api.clientCtx.Client.Status()
if err != nil {
Expand Down Expand Up @@ -229,16 +224,14 @@ func (api *PublicEthereumAPI) Hashrate() hexutil.Uint64 {

// GasPrice returns the current gas price based on Ethermint's gas price oracle.
func (api *PublicEthereumAPI) GasPrice() *hexutil.Big {
monitor := monitor.GetMonitor("eth_gasPrice", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_gasPrice", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd()
return api.gasPrice
}

// Accounts returns the list of accounts available to this node.
func (api *PublicEthereumAPI) Accounts() ([]common.Address, error) {
monitor := monitor.GetMonitor("eth_accounts", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_accounts", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd()
return api.accounts()
}
Expand All @@ -264,16 +257,14 @@ func (api *PublicEthereumAPI) accounts() ([]common.Address, error) {

// BlockNumber returns the current block number.
func (api *PublicEthereumAPI) BlockNumber() (hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_blockNumber", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_blockNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd()
return api.backend.BlockNumber()
}

// GetBalance returns the provided account's balance up to the provided block number.
func (api *PublicEthereumAPI) GetBalance(address common.Address, blockNum rpctypes.BlockNumber) (*hexutil.Big, error) {
monitor := monitor.GetMonitor("eth_getBalance", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getBalance", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNum)
acc, err := api.wrappedBackend.MustGetAccount(address.Bytes())
if err == nil {
Expand Down Expand Up @@ -387,8 +378,7 @@ func (api *PublicEthereumAPI) getStorageAt(address common.Address, key []byte, b

// GetStorageAt returns the contract storage at the given address, block number, and key.
func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, blockNum rpctypes.BlockNumber) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getStorageAt", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getStorageAt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "key", key, "block number", blockNum)
return api.getStorageAt(address, common.HexToHash(key).Bytes(), blockNum, false)
}
Expand All @@ -400,8 +390,7 @@ func (api *PublicEthereumAPI) GetStorageAtInternal(address common.Address, key [

// GetTransactionCount returns the number of transactions at the given address up to the given block number.
func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockNum rpctypes.BlockNumber) (*hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNum)
clientCtx := api.clientCtx
pending := blockNum == rpctypes.PendingBlockNumber
Expand All @@ -421,8 +410,7 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN

// GetBlockTransactionCountByHash returns the number of transactions in the block identified by hash.
func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *hexutil.Uint {
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
Expand All @@ -445,8 +433,7 @@ func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *

// GetBlockTransactionCountByNumber returns the number of transactions in the block identified by its height.
func (api *PublicEthereumAPI) GetBlockTransactionCountByNumber(blockNum rpctypes.BlockNumber) *hexutil.Uint {
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByNumber", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("block number", blockNum)
var (
height int64
Expand Down Expand Up @@ -506,8 +493,7 @@ func (api *PublicEthereumAPI) GetUncleCountByBlockNumber(_ rpctypes.BlockNumber)

// GetCode returns the contract code at the given address and block number.
func (api *PublicEthereumAPI) GetCode(address common.Address, blockNumber rpctypes.BlockNumber) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getCode", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getCode", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNumber)
code, err := api.wrappedBackend.GetCode(address, uint64(blockNumber))
if err == nil {
Expand Down Expand Up @@ -552,8 +538,7 @@ func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtype

// Sign signs the provided data using the private key of address via Geth's signature standard.
func (api *PublicEthereumAPI) Sign(address common.Address, data hexutil.Bytes) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_sign", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_sign", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "data", data)
// TODO: Change this functionality to find an unlocked account by address

Expand All @@ -575,8 +560,7 @@ func (api *PublicEthereumAPI) Sign(address common.Address, data hexutil.Bytes) (

// SendTransaction sends an Ethereum transaction.
func (api *PublicEthereumAPI) SendTransaction(args rpctypes.SendTxArgs) (common.Hash, error) {
monitor := monitor.GetMonitor("eth_sendTransaction", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_sendTransaction", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args)
// TODO: Change this functionality to find an unlocked account by address

Expand Down Expand Up @@ -634,8 +618,7 @@ func (api *PublicEthereumAPI) SendTransaction(args rpctypes.SendTxArgs) (common.

// SendRawTransaction send a raw Ethereum transaction.
func (api *PublicEthereumAPI) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) {
monitor := monitor.GetMonitor("eth_sendRawTransaction", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_sendRawTransaction", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("data", data)
tx := new(evmtypes.MsgEthereumTx)

Expand Down Expand Up @@ -706,8 +689,7 @@ func (api *PublicEthereumAPI) addCallCache(key common.Hash, data []byte) {

// Call performs a raw contract call.
func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNr rpctypes.BlockNumber, _ *map[common.Address]rpctypes.Account) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_call", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_call", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args, "block number", blockNr)
key := api.buildKey(args)
cacheData, ok := api.getFromCallCache(key)
Expand Down Expand Up @@ -847,8 +829,7 @@ func (api *PublicEthereumAPI) doCall(
// It adds 1,000 gas to the returned value instead of using the gas adjustment
// param from the SDK.
func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_estimateGas", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_estimateGas", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args)
simResponse, err := api.doCall(args, 0, big.NewInt(ethermint.DefaultRPCGasLimit), true)
if err != nil {
Expand All @@ -866,8 +847,7 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6

// GetBlockByHash returns the block identified by hash.
func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (interface{}, error) {
monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash, "full", fullTx)
block, err := api.backend.GetBlockByHash(hash, fullTx)
if err != nil {
Expand All @@ -878,8 +858,7 @@ func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (int

// GetBlockByNumber returns the block identified by number.
func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fullTx bool) (interface{}, error) {
monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("number", blockNum, "full", fullTx)
var blockTxs interface{}
if blockNum != rpctypes.PendingBlockNumber {
Expand Down Expand Up @@ -936,8 +915,7 @@ func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fu

// GetTransactionByHash returns the transaction identified by hash.
func (api *PublicEthereumAPI) GetTransactionByHash(hash common.Hash) (*rpctypes.Transaction, error) {
monitor := monitor.GetMonitor("eth_getTransactionByHash", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getTransactionByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rawTx, err := api.wrappedBackend.GetTransactionByHash(hash)
if err == nil {
Expand Down Expand Up @@ -972,8 +950,7 @@ func (api *PublicEthereumAPI) GetTransactionByHash(hash common.Hash) (*rpctypes.

// GetTransactionByBlockHashAndIndex returns the transaction identified by hash and index.
func (api *PublicEthereumAPI) GetTransactionByBlockHashAndIndex(hash common.Hash, idx hexutil.Uint) (*rpctypes.Transaction, error) {
monitor := monitor.GetMonitor("eth_getTransactionByBlockHashAndIndex", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getTransactionByBlockHashAndIndex", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash, "index", idx)
res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
Expand Down Expand Up @@ -1058,8 +1035,7 @@ func (api *PublicEthereumAPI) getTransactionByBlockAndIndex(block *tmtypes.Block

// GetTransactionReceipt returns the transaction receipt identified by hash.
func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (interface{}, error) {
monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger)
monitor.OnBegin(api.Metrics)
monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
res, e := api.wrappedBackend.GetTransactionReceipt(hash)
if e == nil {
Expand Down
Loading

0 comments on commit 20e040e

Please sign in to comment.