Skip to content

Commit

Permalink
Simple delete queue for mercury transmitter (#11182)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 17, 2023
1 parent e623afd commit aa06f2a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 6 deletions.
85 changes: 79 additions & 6 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

var (
maxTransmitQueueSize = 10_000
maxDeleteQueueSize = 10_000
transmitTimeout = 5 * time.Second
)

Expand Down Expand Up @@ -60,6 +61,24 @@ var (
},
[]string{"feedID"},
)
transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_delete_error_count",
Help: "Running count of DB errors when trying to delete an item from the queue DB",
},
[]string{"feedID"},
)
transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_insert_error_count",
Help: "Running count of DB errors when trying to insert an item into the queue DB",
},
[]string{"feedID"},
)
transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_push_error_count",
Help: "Running count of DB errors when trying to push an item onto the queue",
},
[]string{"feedID"},
)
transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_server_error_count",
Help: "Number of errored transmissions that failed due to an error returned by the mercury server",
Expand Down Expand Up @@ -99,9 +118,14 @@ type mercuryTransmitter struct {
queue *TransmitQueue
wg sync.WaitGroup

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
deleteQueue chan *pb.TransmitRequest

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
}

var PayloadTypes = getPayloadTypes()
Expand Down Expand Up @@ -139,9 +163,13 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp
make(chan (struct{})),
nil,
sync.WaitGroup{},
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
transmitSuccessCount.WithLabelValues(feedIDHex),
transmitDuplicateCount.WithLabelValues(feedIDHex),
transmitConnectionErrorCount.WithLabelValues(feedIDHex),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex),
}
}

Expand All @@ -164,6 +192,8 @@ func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) {
return err
}
mt.wg.Add(1)
go mt.runDeleteQueueLoop()
mt.wg.Add(1)
go mt.runQueueLoop()
return nil
})
Expand Down Expand Up @@ -192,6 +222,46 @@ func (mt *mercuryTransmitter) HealthReport() map[string]error {
return report
}

func (mt *mercuryTransmitter) runDeleteQueueLoop() {
defer mt.wg.Done()
runloopCtx, cancel := mt.stopCh.Ctx(context.Background())
defer cancel()

// Exponential backoff for very rarely occurring errors (DB disconnect etc)
b := backoff.Backoff{
Min: 1 * time.Second,
Max: 120 * time.Second,
Factor: 2,
Jitter: true,
}

for {
select {
case req := <-mt.deleteQueue:
for {
if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil {
mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req)
mt.transmitQueueDeleteErrorCount.Inc()
select {
case <-time.After(b.Duration()):
// Wait a backoff duration before trying to delete again
continue
case <-mt.stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
break
}
// success
b.Reset()
case <-mt.stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (mt *mercuryTransmitter) runQueueLoop() {
defer mt.wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand Down Expand Up @@ -253,9 +323,10 @@ func (mt *mercuryTransmitter) runQueueLoop() {
}
}

if err := mt.persistenceManager.Delete(runloopCtx, t.Req); err != nil {
mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "reportCtx", t.ReportCtx)
return
select {
case mt.deleteQueue <- t.Req:
default:
mt.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx)
}
}
}
Expand Down Expand Up @@ -288,9 +359,11 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R
mt.lggr.Tracew("Transmit enqueue", "req", req, "report", report, "reportCtx", reportCtx, "signatures", signatures)

if err := mt.persistenceManager.Insert(ctx, req, reportCtx); err != nil {
mt.transmitQueueInsertErrorCount.Inc()
return err
}
if ok := mt.queue.Push(req, reportCtx); !ok {
mt.transmitQueuePushErrorCount.Inc()
return errors.New("transmit queue is closed")
}
return nil
Expand Down
5 changes: 5 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`.
- New prom metrics for mercury:
`mercury_transmit_queue_delete_error_count`
`mercury_transmit_queue_insert_error_count`
`mercury_transmit_queue_push_error_count`
Nops should consider alerting on these.


### Changed
Expand Down

0 comments on commit aa06f2a

Please sign in to comment.