Skip to content

Commit

Permalink
[Functions] Remove V0 support from Listener (#11054)
Browse files Browse the repository at this point in the history
  • Loading branch information
bolekk authored Oct 23, 2023
1 parent 1d7cec6 commit 370cdb0
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 1,162 deletions.
206 changes: 18 additions & 188 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"

Expand All @@ -16,9 +15,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/cbor"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/log"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/ocr2dr_oracle"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
Expand All @@ -33,7 +29,6 @@ import (
)

var (
_ log.Listener = &FunctionsListener{}
_ job.ServiceCtx = &FunctionsListener{}

sizeBuckets = []float64{
Expand All @@ -45,51 +40,51 @@ var (
1024 * 256,
}

promOracleEvent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_oracle_event",
Help: "Metric to track received oracle events",
}, []string{"oracle", "event"})
promRequestReceived = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_received",
Help: "Metric to track received request events",
}, []string{"router"})

promRequestInternalError = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_internal_error",
Help: "Metric to track internal errors",
}, []string{"oracle"})
}, []string{"router"})

promRequestComputationError = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_computation_error",
Help: "Metric to track computation errors",
}, []string{"oracle"})
}, []string{"router"})

promRequestComputationSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_computation_success",
Help: "Metric to track number of computed requests",
}, []string{"oracle"})
}, []string{"router"})

promRequestTimeout = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_timeout",
Help: "Metric to track number of timed out requests",
}, []string{"oracle"})
}, []string{"router"})

promRequestConfirmed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_confirmed",
Help: "Metric to track number of confirmed requests",
}, []string{"oracle", "responseType"})
}, []string{"router"})

promRequestDataSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "functions_request_data_size",
Help: "Metric to track request data size",
Buckets: sizeBuckets,
}, []string{"oracle"})
}, []string{"router"})

promComputationResultSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "functions_request_computation_result_size",
Help: "Metric to track computation result size in bytes",
}, []string{"oracle"})
}, []string{"router"})

promComputationErrorSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "functions_request_computation_error_size",
Help: "Metric to track computation error size in bytes",
}, []string{"oracle"})
}, []string{"router"})

promComputationDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "functions_request_computation_duration",
Expand All @@ -103,12 +98,12 @@ var (
float64(30 * time.Second),
float64(60 * time.Second),
},
}, []string{"oracle"})
}, []string{"router"})

promPrunedRequests = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "functions_request_pruned",
Help: "Metric to track number of requests pruned from the DB",
}, []string{"oracle"})
}, []string{"router"})
)

const (
Expand All @@ -126,17 +121,14 @@ type FunctionsListener struct {
contractAddressHex string
job job.Job
bridgeAccessor BridgeAccessor
logBroadcaster log.Broadcaster
shutdownWaitGroup sync.WaitGroup
mbOracleEvents *utils.Mailbox[log.Broadcast]
serviceContext context.Context
serviceCancel context.CancelFunc
chStop chan struct{}
pluginORM ORM
pluginConfig config.PluginConfig
s4Storage s4.Storage
logger logger.Logger
mailMon *utils.MailboxMonitor
urlsMonEndpoint commontypes.MonitoringEndpoint
decryptor threshold.Decryptor
logPollerWrapper evmrelayTypes.LogPollerWrapper
Expand All @@ -160,9 +152,7 @@ func NewFunctionsListener(
pluginORM ORM,
pluginConfig config.PluginConfig,
s4Storage s4.Storage,
logBroadcaster log.Broadcaster,
lggr logger.Logger,
mailMon *utils.MailboxMonitor,
urlsMonEndpoint commontypes.MonitoringEndpoint,
decryptor threshold.Decryptor,
logPollerWrapper evmrelayTypes.LogPollerWrapper,
Expand All @@ -172,14 +162,11 @@ func NewFunctionsListener(
contractAddressHex: contractAddressHex,
job: job,
bridgeAccessor: bridgeAccessor,
logBroadcaster: logBroadcaster,
mbOracleEvents: utils.NewHighCapacityMailbox[log.Broadcast](),
chStop: make(chan struct{}),
pluginORM: pluginORM,
pluginConfig: pluginConfig,
s4Storage: s4Storage,
logger: lggr,
mailMon: mailMon,
urlsMonEndpoint: urlsMonEndpoint,
decryptor: decryptor,
logPollerWrapper: logPollerWrapper,
Expand All @@ -190,34 +177,13 @@ func NewFunctionsListener(
func (l *FunctionsListener) Start(context.Context) error {
return l.StartOnce("FunctionsListener", func() error {
l.serviceContext, l.serviceCancel = context.WithCancel(context.Background())
contractAddress := common.HexToAddress(l.contractAddressHex)
var unsubscribeLogs func()

switch l.pluginConfig.ContractVersion {
case 0:
oracleContract, err := ocr2dr_oracle.NewOCR2DROracle(contractAddress, l.client)
if err != nil {
return err
}
unsubscribeLogs = l.logBroadcaster.Register(l, log.ListenerOpts{
Contract: oracleContract.Address(),
ParseLog: oracleContract.ParseLog,
LogsWithTopics: map[common.Hash][][]log.Topic{
ocr2dr_oracle.OCR2DROracleOracleRequest{}.Topic(): {},
ocr2dr_oracle.OCR2DROracleOracleResponse{}.Topic(): {},
ocr2dr_oracle.OCR2DROracleUserCallbackError{}.Topic(): {},
ocr2dr_oracle.OCR2DROracleUserCallbackRawError{}.Topic(): {},
ocr2dr_oracle.OCR2DROracleResponseTransmitted{}.Topic(): {},
},
MinIncomingConfirmations: l.pluginConfig.MinIncomingConfirmations,
})
l.shutdownWaitGroup.Add(1)
go l.processOracleEventsV0()
case 1:
l.shutdownWaitGroup.Add(1)
go l.processOracleEventsV1()
default:
return errors.New("Functions: unsupported PluginConfig.ContractVersion")
return fmt.Errorf("unsupported contract version: %d", l.pluginConfig.ContractVersion)
}

if l.pluginConfig.ListenerEventHandlerTimeoutSec == 0 {
Expand All @@ -228,14 +194,8 @@ func (l *FunctionsListener) Start(context.Context) error {
go l.pruneRequests()
go func() {
<-l.chStop
if unsubscribeLogs != nil {
unsubscribeLogs() // v0 only
}
l.shutdownWaitGroup.Done()
}()

l.mailMon.Monitor(l.mbOracleEvents, "FunctionsListener", "OracleEvents", fmt.Sprint(l.job.ID))

return nil
})
}
Expand All @@ -246,94 +206,10 @@ func (l *FunctionsListener) Close() error {
l.serviceCancel()
close(l.chStop)
l.shutdownWaitGroup.Wait()

return l.mbOracleEvents.Close()
return nil
})
}

// HandleLog implements log.Listener
func (l *FunctionsListener) HandleLog(lb log.Broadcast) {
log := lb.DecodedLog()
if log == nil || reflect.ValueOf(log).IsNil() {
l.logger.Error("HandleLog: ignoring nil value")
return
}

switch log := log.(type) {
case *ocr2dr_oracle.OCR2DROracleOracleRequest, *ocr2dr_oracle.OCR2DROracleOracleResponse, *ocr2dr_oracle.OCR2DROracleUserCallbackError, *ocr2dr_oracle.OCR2DROracleUserCallbackRawError, *ocr2dr_oracle.OCR2DROracleResponseTransmitted, *functions_coordinator.FunctionsCoordinatorOracleRequest, *functions_coordinator.FunctionsCoordinatorOracleResponse:
wasOverCapacity := l.mbOracleEvents.Deliver(lb)
if wasOverCapacity {
l.logger.Error("OracleRequest log mailbox is over capacity - dropped the oldest log")
}
default:
l.logger.Errorf("Unexpected log type %T", log)
}
}

// JobID() complies with log.Listener
func (l *FunctionsListener) JobID() int32 {
return l.job.ID
}

func (l *FunctionsListener) processOracleEventsV0() {
defer l.shutdownWaitGroup.Done()
for {
select {
case <-l.chStop:
return
case <-l.mbOracleEvents.Notify():
for {
select {
case <-l.chStop:
return
default:
}
lb, exists := l.mbOracleEvents.Retrieve()
if !exists {
break
}
was, err := l.logBroadcaster.WasAlreadyConsumed(lb)
if err != nil {
l.logger.Errorw("Could not determine if log was already consumed", "err", err)
continue
} else if was {
continue
}

log := lb.DecodedLog()
if log == nil || reflect.ValueOf(log).IsNil() {
l.logger.Error("processOracleEvents: ignoring nil value")
continue
}

switch log := log.(type) {
// Version 0
case *ocr2dr_oracle.OCR2DROracleOracleRequest:
promOracleEvent.WithLabelValues(log.Raw.Address.Hex(), "OracleRequest").Inc()
l.shutdownWaitGroup.Add(1)
go l.handleOracleRequestV0(log, lb)
case *ocr2dr_oracle.OCR2DROracleOracleResponse:
promOracleEvent.WithLabelValues(log.Raw.Address.Hex(), "OracleResponse").Inc()
l.shutdownWaitGroup.Add(1)
go l.handleOracleResponseV0("OracleResponse", log.RequestId, lb)
case *ocr2dr_oracle.OCR2DROracleUserCallbackError:
promOracleEvent.WithLabelValues(log.Raw.Address.Hex(), "UserCallbackError").Inc()
l.shutdownWaitGroup.Add(1)
go l.handleOracleResponseV0("UserCallbackError", log.RequestId, lb)
case *ocr2dr_oracle.OCR2DROracleUserCallbackRawError:
promOracleEvent.WithLabelValues(log.Raw.Address.Hex(), "UserCallbackRawError").Inc()
l.shutdownWaitGroup.Add(1)
go l.handleOracleResponseV0("UserCallbackRawError", log.RequestId, lb)
case *ocr2dr_oracle.OCR2DROracleResponseTransmitted:
promOracleEvent.WithLabelValues(log.Raw.Address.Hex(), "ResponseTransmitted").Inc()
default:
l.logger.Warnf("Unexpected log type %T", log)
}
}
}
}
}

func (l *FunctionsListener) processOracleEventsV1() {
defer l.shutdownWaitGroup.Done()
freqMillis := l.pluginConfig.ListenerEventsCheckFrequencyMillis
Expand Down Expand Up @@ -429,6 +305,7 @@ func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleR
return
}

promRequestReceived.WithLabelValues(l.contractAddressHex).Inc()
promRequestDataSize.WithLabelValues(l.contractAddressHex).Observe(float64(len(request.Data)))
requestData, err := l.parseCBOR(request.RequestId, request.Data, l.getMaxCBORsize(request.Flags))
if err != nil {
Expand All @@ -438,34 +315,6 @@ func (l *FunctionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleR
l.handleRequest(ctx, request.RequestId, request.SubscriptionId, request.SubscriptionOwner, request.Flags, requestData)
}

// deprecated
func (l *FunctionsListener) handleOracleRequestV0(request *ocr2dr_oracle.OCR2DROracleOracleRequest, lb log.Broadcast) {
defer l.shutdownWaitGroup.Done()
ctx, cancel := l.getNewHandlerContext()
defer cancel()
l.logger.Infow("oracle request received", "requestID", formatRequestId(request.RequestId))

newReq := &Request{RequestID: request.RequestId, RequestTxHash: &request.Raw.TxHash, ReceivedAt: time.Now()}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("received a log with duplicate request ID", "requestID", formatRequestId(request.RequestId), "err", err)
l.markLogConsumed(lb, pg.WithParentCtx(ctx))
} else {
l.logger.Errorw("failed to create a DB entry for new request", "requestID", formatRequestId(request.RequestId), "err", err)
}
return
}
l.markLogConsumed(lb, pg.WithParentCtx(ctx))

promRequestDataSize.WithLabelValues(l.contractAddressHex).Observe(float64(len(request.Data)))
requestData, err := l.parseCBOR(request.RequestId, request.Data, l.pluginConfig.MaxRequestSizeBytes)
if err != nil {
l.setError(ctx, request.RequestId, USER_ERROR, []byte(err.Error()))
return
}
l.handleRequest(ctx, request.RequestId, request.SubscriptionId, request.SubscriptionOwner, [32]byte{}, requestData)
}

func (l *FunctionsListener) parseCBOR(requestId RequestID, cborData []byte, maxSizeBytes uint32) (*RequestData, error) {
if maxSizeBytes > 0 && uint32(len(cborData)) > maxSizeBytes {
l.logger.Errorw("request too big", "requestID", formatRequestId(requestId), "requestSize", len(cborData), "maxRequestSize", maxSizeBytes)
Expand Down Expand Up @@ -560,26 +409,7 @@ func (l *FunctionsListener) handleOracleResponseV1(response *evmrelayTypes.Oracl
if err := l.pluginORM.SetConfirmed(response.RequestId, pg.WithParentCtx(ctx)); err != nil {
l.logger.Errorw("setting CONFIRMED state failed", "requestID", formatRequestId(response.RequestId), "err", err)
}
promRequestConfirmed.WithLabelValues(l.contractAddressHex, "OracleResponse").Inc()
}

func (l *FunctionsListener) handleOracleResponseV0(responseType string, requestID [32]byte, lb log.Broadcast) {
defer l.shutdownWaitGroup.Done()
l.logger.Infow("oracle response received", "type", responseType, "requestID", formatRequestId(requestID))

ctx, cancel := l.getNewHandlerContext()
defer cancel()
if err := l.pluginORM.SetConfirmed(requestID, pg.WithParentCtx(ctx)); err != nil {
l.logger.Errorw("setting CONFIRMED state failed", "requestID", formatRequestId(requestID), "err", err)
}
promRequestConfirmed.WithLabelValues(l.contractAddressHex, responseType).Inc()
l.markLogConsumed(lb, pg.WithParentCtx(ctx))
}

func (l *FunctionsListener) markLogConsumed(lb log.Broadcast, qopts ...pg.QOpt) {
if err := l.logBroadcaster.MarkConsumed(lb, qopts...); err != nil {
l.logger.Errorw("unable to mark log consumed", "err", err, "log", lb.String())
}
promRequestConfirmed.WithLabelValues(l.contractAddressHex).Inc()
}

func (l *FunctionsListener) timeoutRequests() {
Expand Down
Loading

0 comments on commit 370cdb0

Please sign in to comment.