Skip to content

Commit

Permalink
[rfq] metrics, hotfixes (#2854)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Trajan0x <[email protected]>
Co-authored-by: Daniel Wasserman <[email protected]>
  • Loading branch information
3 people authored Jul 8, 2024
1 parent 31b5f8f commit 9779cda
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 11 deletions.
9 changes: 9 additions & 0 deletions ethergo/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type chainListener struct {
newBlockHandler NewBlockHandler
finalityMode rpc.BlockNumber
blockWait uint64
// otelRecorder is the recorder for the otel metrics.
otelRecorder iOtelRecorder
}

var (
Expand All @@ -77,6 +79,12 @@ func NewChainListener(omnirpcClient client.EVM, store listenerDB.ChainListenerDB
option(c)
}

var err error
c.otelRecorder, err = newOtelRecorder(handler, int(c.chainID))
if err != nil {
return nil, fmt.Errorf("could not create otel recorder: %w", err)
}

return c, nil
}

Expand Down Expand Up @@ -183,6 +191,7 @@ func (c *chainListener) doPoll(parentCtx context.Context, handler HandleLog) (er
if err != nil {
return fmt.Errorf("could not put latest block: %w", err)
}
c.otelRecorder.RecordLastBlock(endBlock)

c.startBlock = lastUnconfirmedBlock
return nil
Expand Down
118 changes: 118 additions & 0 deletions ethergo/listener/otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package listener

import (
"context"
"fmt"
"time"

"github.com/synapsecns/sanguine/core/metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const meterName = "github.com/synapsecns/sanguine/ethergo/listener"

// generate an interface for otelRecorder that exports the public method.
// this allows us to avoid using recordX externally anad makes the package less confusing.
//
// =============================================================================
// =============================================================================
// IMPORTANT: DO NOT REMOVE THIS COMMENT.
// NOTICE: PLEASE MAKE SURE YOU UPDATE BOTH THE DOCS AND THE GRAFANA DASHBOARD (IF NEEDED) AFTER UPDATING METRICS.
// =============================================================================
// =============================================================================
//
//go:generate go run github.com/vburenin/ifacemaker -f otel.go -s otelRecorder -i iOtelRecorder -p listener -o otel_generated.go -c "autogenerated file"
type otelRecorder struct {
metrics metrics.Handler
// meter is the metrics meter.
meter metric.Meter
// lastBlockGauge is the gauge for the last block.
lastBlockGauge metric.Int64ObservableGauge
// lastFetchedBlockAgeGauge is the gauge for the last block age.
lastFetchedBlockAgeGauge metric.Float64ObservableGauge
// lastBlock is the last block processed by the listener.
lastBlock *uint64
// lastBlockFetchTime is the time the last block was fetched (used to calculate last block age).
lastBlockFetchTime *time.Time
// chainID is the chain ID for the listener.
chainID int
}

func newOtelRecorder(meterHandler metrics.Handler, chainID int) (_ iOtelRecorder, err error) {
or := otelRecorder{
metrics: meterHandler,
meter: meterHandler.Meter(meterName),
lastBlock: nil,
lastBlockFetchTime: nil,
chainID: chainID,
}

or.lastBlockGauge, err = or.meter.Int64ObservableGauge("last_block")
if err != nil {
return nil, fmt.Errorf("could not create last block gauge")
}

or.lastFetchedBlockAgeGauge, err = or.meter.Float64ObservableGauge("last_block_age")
if err != nil {
return nil, fmt.Errorf("could not create last block age gauge")
}

_, err = or.meter.RegisterCallback(or.recordLastBlock, or.lastBlockGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for last block gauge")
}

_, err = or.meter.RegisterCallback(or.recordLastFetchedBlockAge, or.lastFetchedBlockAgeGauge)
if err != nil {
return nil, fmt.Errorf("could not register callback for last block age gauge")
}

return &or, nil
}

func (o *otelRecorder) recordLastBlock(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.lastBlockGauge == nil || o.lastBlock == nil {
return nil
}

opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, o.chainID),
)
observer.ObserveInt64(o.lastBlockGauge, int64(*o.lastBlock), opts)

return nil
}

func (o *otelRecorder) recordLastFetchedBlockAge(_ context.Context, observer metric.Observer) (err error) {
if o.metrics == nil || o.lastFetchedBlockAgeGauge == nil || o.lastBlockFetchTime == nil {
return nil
}

age := time.Since(*o.lastBlockFetchTime).Seconds()
opts := metric.WithAttributes(
attribute.Int(metrics.ChainID, o.chainID),
)
observer.ObserveFloat64(o.lastFetchedBlockAgeGauge, age, opts)

return nil
}

// RecordLastBlock records the last block processed by the listener.
func (o *otelRecorder) RecordLastBlock(lastBlock uint64) {
// verify if the last block has changed
var hasChanged bool
if o.lastBlock == nil {
hasChanged = true
} else {
hasChanged = *o.lastBlock != lastBlock
}
if !hasChanged {
return
}

// record the block
o.lastBlock = &lastBlock
fetchTime := time.Now()
o.lastBlockFetchTime = &fetchTime
}
9 changes: 9 additions & 0 deletions ethergo/listener/otel_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/rfq/relayer/reldb/base/quote.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s Store) UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status
if prevStatus == nil {
req, err := s.GetQuoteRequestByID(ctx, id)
if err != nil {
return fmt.Errorf("could not get quote: %w", err)
return fmt.Errorf("could not get quote: %w", reldb.ErrNoQuoteForID)
}
prevStatus = &req.Status
}
Expand Down
20 changes: 17 additions & 3 deletions services/rfq/relayer/service/chainindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package service

import (
"context"
"errors"
"fmt"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -85,7 +87,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil)
return r.setRelayRaceLost(ctx, event.TransactionId)
}

err = r.handleRelayLog(ctx, event)
Expand All @@ -99,7 +101,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil)
return r.setRelayRaceLost(ctx, event.TransactionId)
}

err = r.handleProofProvided(ctx, event)
Expand All @@ -113,7 +115,7 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil)
return r.setRelayRaceLost(ctx, event.TransactionId)
}

err = r.handleDepositClaimed(ctx, event)
Expand Down Expand Up @@ -212,3 +214,15 @@ func (r *Relayer) handleDepositClaimed(ctx context.Context, event *fastbridge.Fa
}
return nil
}

func (r *Relayer) setRelayRaceLost(ctx context.Context, transactionID [32]byte) error {
err := r.db.UpdateQuoteRequestStatus(ctx, transactionID, reldb.RelayRaceLost, nil)
// quote does not exist, no need to update status
if err != nil && (errors.Is(err, reldb.ErrNoQuoteForID) || strings.Contains(err.Error(), reldb.ErrNoQuoteForID.Error())) {
return nil
}
if err != nil {
return fmt.Errorf("could not set relay race lost: %w", err)
}
return nil
}
8 changes: 1 addition & 7 deletions services/rfq/relayer/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,7 @@ func (r *Relayer) processDB(ctx context.Context, serial bool, matchStatuses ...r
} else {
// process in parallel (new goroutine)
request := req // capture func literal
ok := r.semaphore.TryAcquire(1)
if !ok {
span.AddEvent("could not acquire semaphore", trace.WithAttributes(
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])),
))
continue
}
err = r.semaphore.Acquire(ctx, 1)
if err != nil {
return fmt.Errorf("could not acquire semaphore: %w", err)
}
Expand Down

0 comments on commit 9779cda

Please sign in to comment.