diff --git a/core/capabilities/streams/mock_trigger.go b/core/capabilities/streams/mock_trigger.go index d4d32cc94d2..2ceeb46bb93 100644 --- a/core/capabilities/streams/mock_trigger.go +++ b/core/capabilities/streams/mock_trigger.go @@ -1,14 +1,10 @@ package streams -// NOTE: this file is an amalgamation of MercuryTrigger and the streams trigger load tests -// the mercury trigger was modified to contain non-empty meta and sign the report with mock keys - import ( "context" "crypto/ecdsa" - "fmt" + "maps" "math/big" - "strconv" "sync" "time" @@ -19,19 +15,16 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers/streams" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" v3 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v3" - "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/v3/reportcodec" ) -const ( - baseTimestamp = 1000000000 -) - func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegistry) (*MockTriggerService, error) { ctx := context.TODO() trigger := NewMockTriggerService(100, lggr) @@ -45,34 +38,6 @@ func RegisterMockTrigger(lggr logger.Logger, capRegistry core.CapabilitiesRegist return trigger, nil } -// NOTE: duplicated from trigger_test.go -func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { - v3Codec := reportcodec.NewReportCodec(feedID, lggr) - raw, err := v3Codec.BuildReport(v3.ReportFields{ - BenchmarkPrice: price, - Timestamp: uint32(timestamp), - ValidFromTimestamp: uint32(timestamp), - Bid: price, - Ask: price, - LinkFee: price, - NativeFee: price, - ExpiresAt: uint32(timestamp + 1000000), - }) - if err != nil { - panic(err) - } - return raw -} - -func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { - rc := evmutil.RawReportContext(reportCtx) - flat := []byte{} - for _, r := range rc { - flat = append(flat, r[:]...) - } - return flat -} - const triggerID = "mock-streams-trigger@1.0.0" var capInfo = capabilities.MustNewCapabilityInfo( @@ -81,62 +46,24 @@ var capInfo = capabilities.MustNewCapabilityInfo( "Mock Streams Trigger", ) -const defaultTickerResolutionMs = 1000 - -// TODO pending capabilities configuration implementation - this should be configurable with a sensible default -const defaultSendChannelBufferSize = 1000 - -type config struct { - // strings should be hex-encoded 32-byte values, prefixed with "0x", all lowercase, minimum 1 item - FeedIDs []string `json:"feedIds" jsonschema:"pattern=^0x[0-9a-f]{64}$,minItems=1"` - // must be greater than 0 - MaxFrequencyMs int `json:"maxFrequencyMs" jsonschema:"minimum=1"` -} - -type inputs struct { - TriggerID string `json:"triggerId"` -} - -var mercuryTriggerValidator = capabilities.NewValidator[config, inputs, capabilities.TriggerEvent](capabilities.ValidatorArgs{Info: capInfo}) - -// This Trigger Service allows for the registration and deregistration of triggers. You can also send reports to the service. +// Wraps the MercuryTriggerService to produce a trigger with mocked data type MockTriggerService struct { - capabilities.Validator[config, inputs, capabilities.TriggerEvent] - capabilities.CapabilityInfo - tickerResolutionMs int64 - subscribers map[string]*subscriber - latestReports map[datastreams.FeedID]datastreams.FeedReport - mu sync.Mutex - stopCh services.StopChan - wg sync.WaitGroup - lggr logger.Logger - - // - meta datastreams.SignersMetadata - signers []*ecdsa.PrivateKey - producer *mockDataProducer - // -} - -var _ capabilities.TriggerCapability = (*MockTriggerService)(nil) -var _ services.Service = &MockTriggerService{} - -type subscriber struct { - ch chan<- capabilities.CapabilityResponse - workflowID string - config config + *triggers.MercuryTriggerService + meta datastreams.Metadata + signers []*ecdsa.PrivateKey + stopCh services.StopChan + wg sync.WaitGroup + subscribers map[string][]streams.FeedId + subscribersMu sync.Mutex + lggr logger.Logger } -// Mock Trigger will send events to each subscriber every MaxFrequencyMs (configurable per subscriber). -// Event generation happens whenever local unix time is a multiple of tickerResolutionMs. Therefore, -// all subscribers' MaxFrequencyMs values need to be a multiple of tickerResolutionMs. func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTriggerService { - if tickerResolutionMs == 0 { - tickerResolutionMs = defaultTickerResolutionMs - } - // + trigger := triggers.NewMercuryTriggerService(tickerResolutionMs, lggr) + trigger.CapabilityInfo = capInfo + f := 1 - meta := datastreams.SignersMetadata{MinRequiredSignatures: 2*f + 1} + meta := datastreams.Metadata{MinRequiredSignatures: 2*f + 1} // gen private keys for MinRequiredSignatures signers := []*ecdsa.PrivateKey{} for i := 0; i < meta.MinRequiredSignatures; i++ { @@ -153,256 +80,85 @@ func NewMockTriggerService(tickerResolutionMs int64, lggr logger.Logger) *MockTr signerAddr := crypto.PubkeyToAddress(privKey.PublicKey).Bytes() meta.Signers = append(meta.Signers, signerAddr) } - // - return &MockTriggerService{ - Validator: mercuryTriggerValidator, - CapabilityInfo: capInfo, - tickerResolutionMs: tickerResolutionMs, - subscribers: make(map[string]*subscriber), - latestReports: make(map[datastreams.FeedID]datastreams.FeedReport), - stopCh: make(services.StopChan), - lggr: lggr.Named("MockTriggerService"), - meta: meta, - signers: signers} -} - -func (o *MockTriggerService) ProcessReport(reports []datastreams.FeedReport) error { - o.mu.Lock() - defer o.mu.Unlock() - o.lggr.Debugw("ProcessReport", "nReports", len(reports)) - for _, report := range reports { - feedID := datastreams.FeedID(report.FeedID) - o.latestReports[feedID] = report - } - return nil -} - -func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { - wid := req.Metadata.WorkflowID - - o.mu.Lock() - defer o.mu.Unlock() - - config, err := o.ValidateConfig(req.Config) - if err != nil { - return nil, err - } - - inputs, err := o.ValidateInputs(req.Inputs) - if err != nil { - return nil, err - } - - triggerID := o.getTriggerID(inputs.TriggerID, wid) - // If triggerId is already registered, return an error - if _, ok := o.subscribers[triggerID]; ok { - return nil, fmt.Errorf("triggerId %s already registered", triggerID) - } - - if int64(config.MaxFrequencyMs)%o.tickerResolutionMs != 0 { - return nil, fmt.Errorf("MaxFrequencyMs must be a multiple of %d", o.tickerResolutionMs) - } - - ch := make(chan capabilities.CapabilityResponse, defaultSendChannelBufferSize) - o.subscribers[triggerID] = - &subscriber{ - ch: ch, - workflowID: wid, - config: *config, - } - // Only start the producer once a workflow is registered - o.producer = NewMockDataProducer(o, o.meta, o.signers, config.FeedIDs, o.lggr) - if err := o.producer.Start(ctx); err != nil { - return nil, err - } + // MercuryTrigger is typically wrapped by other modules that ignore the trigger's meta and provide a different one. + // Since we're skipping those wrappers we need to provide our own meta here. + trigger.SetMetaOverride(meta) - return ch, nil + return &MockTriggerService{ + MercuryTriggerService: trigger, + meta: meta, + signers: signers, + subscribers: make(map[string][]streams.FeedId), + lggr: lggr} } -func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error { - wid := req.Metadata.WorkflowID - - o.mu.Lock() - defer o.mu.Unlock() - - inputs, err := o.ValidateInputs(req.Inputs) - if err != nil { +func (m *MockTriggerService) Start(ctx context.Context) error { + if err := m.MercuryTriggerService.Start(ctx); err != nil { return err } - triggerID := o.getTriggerID(inputs.TriggerID, wid) - - subscriber, ok := o.subscribers[triggerID] - if !ok { - return fmt.Errorf("triggerId %s not registered", triggerID) - } - close(subscriber.ch) - delete(o.subscribers, triggerID) - if len(o.subscribers) == 0 { - if err := o.producer.Close(); err != nil { - return err - } - o.producer = nil - } + go m.loop() return nil } -func (o *MockTriggerService) getTriggerID(triggerID string, wid string) string { - tid := wid + "|" + triggerID - return tid +func (m *MockTriggerService) Close() error { + close(m.stopCh) + m.wg.Wait() + return m.MercuryTriggerService.Close() } -func (o *MockTriggerService) loop() { - defer o.wg.Done() - now := time.Now().UnixMilli() - nextWait := o.tickerResolutionMs - now%o.tickerResolutionMs - - for { - select { - case <-o.stopCh: - return - case <-time.After(time.Duration(nextWait) * time.Millisecond): - startTs := time.Now().UnixMilli() - // find closest timestamp that is a multiple of o.tickerResolutionMs - aligned := (startTs + o.tickerResolutionMs/2) / o.tickerResolutionMs * o.tickerResolutionMs - o.process(aligned) - endTs := time.Now().UnixMilli() - if endTs-startTs > o.tickerResolutionMs { - o.lggr.Errorw("processing took longer than ticker resolution", "duration", endTs-startTs, "tickerResolutionMs", o.tickerResolutionMs) - } - nextWait = getNextWaitIntervalMs(aligned, o.tickerResolutionMs, endTs) - } +func (o *MockTriggerService) RegisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) { + ch, err := o.MercuryTriggerService.RegisterTrigger(ctx, req) + if err != nil { + return nil, err } -} -func getNextWaitIntervalMs(lastTs, tickerResolutionMs, currentTs int64) int64 { - desiredNext := lastTs + tickerResolutionMs - nextWait := desiredNext - currentTs - if nextWait <= 0 { - nextWait = 0 - } - return nextWait + config, _ := o.MercuryTriggerService.ValidateConfig(req.Config) + o.subscribersMu.Lock() + defer o.subscribersMu.Unlock() + o.subscribers[req.Metadata.WorkflowID] = config.FeedIds + return ch, nil } -func (o *MockTriggerService) process(timestamp int64) { - o.mu.Lock() - defer o.mu.Unlock() - for _, sub := range o.subscribers { - if timestamp%int64(sub.config.MaxFrequencyMs) == 0 { - reportList := make([]datastreams.FeedReport, 0) - for _, feedID := range sub.config.FeedIDs { - if latest, ok := o.latestReports[datastreams.FeedID(feedID)]; ok { - reportList = append(reportList, latest) - } - } - - // use 32-byte-padded timestamp as EventID (human-readable) - eventID := fmt.Sprintf("streams_%024s", strconv.FormatInt(timestamp, 10)) - capabilityResponse, err := wrapReports(reportList, eventID, timestamp, o.meta) - - if err != nil { - o.lggr.Errorw("error wrapping reports", "err", err) - continue - } - - o.lggr.Debugw("ProcessReport pushing event", "nReports", len(reportList), "eventID", eventID) - select { - case sub.ch <- capabilityResponse: - default: - o.lggr.Errorw("subscriber channel full, dropping event", "eventID", eventID, "workflowID", sub.workflowID) - } - } - } +func (o *MockTriggerService) UnregisterTrigger(ctx context.Context, req capabilities.TriggerRegistrationRequest) error { + err := o.MercuryTriggerService.UnregisterTrigger(ctx, req) + o.subscribersMu.Lock() + defer o.subscribersMu.Unlock() + delete(o.subscribers, req.Metadata.WorkflowID) + return err } -func wrapReports(reportList []datastreams.FeedReport, eventID string, timestamp int64, meta datastreams.SignersMetadata) (capabilities.CapabilityResponse, error) { - val, err := values.Wrap(reportList) - if err != nil { - return capabilities.CapabilityResponse{}, err - } - - metaVal, err := values.Wrap(meta) - if err != nil { - return capabilities.CapabilityResponse{}, err - } - - triggerEvent := capabilities.TriggerEvent{ - TriggerType: triggerID, - ID: eventID, - Timestamp: strconv.FormatInt(timestamp, 10), - Metadata: metaVal, - Payload: val, - } +const baseTimestamp = 1000000000 - eventVal, err := values.Wrap(triggerEvent) +// NOTE: duplicated from trigger_test.go +func newReport(lggr logger.Logger, feedID [32]byte, price *big.Int, timestamp int64) []byte { + v3Codec := reportcodec.NewReportCodec(feedID, lggr) + raw, err := v3Codec.BuildReport(v3.ReportFields{ + BenchmarkPrice: price, + Timestamp: uint32(timestamp), + ValidFromTimestamp: uint32(timestamp), + Bid: price, + Ask: price, + LinkFee: price, + NativeFee: price, + ExpiresAt: uint32(timestamp + 1000000), + }) if err != nil { - return capabilities.CapabilityResponse{}, err + panic(err) } - - // Create a new CapabilityResponse with the MockTriggerEvent - return capabilities.CapabilityResponse{ - Value: eventVal.(*values.Map), - }, nil -} - -func (o *MockTriggerService) Start(ctx context.Context) error { - o.wg.Add(1) - go o.loop() - o.lggr.Info("MockTriggerService started") - return nil -} - -func (o *MockTriggerService) Close() error { - close(o.stopCh) - o.wg.Wait() - o.lggr.Info("MockTriggerService closed") - return nil -} - -func (o *MockTriggerService) Ready() error { - return nil -} - -func (o *MockTriggerService) HealthReport() map[string]error { - return nil -} - -func (o *MockTriggerService) Name() string { - return "MockTriggerService" -} - -type mockDataProducer struct { - trigger *MockTriggerService - wg sync.WaitGroup - closeCh chan struct{} - meta datastreams.SignersMetadata - signers []*ecdsa.PrivateKey - feedIDs []string - lggr logger.Logger + return raw } -var _ services.Service = &mockDataProducer{} - -func NewMockDataProducer(trigger *MockTriggerService, meta datastreams.SignersMetadata, signers []*ecdsa.PrivateKey, feedIDs []string, lggr logger.Logger) *mockDataProducer { - return &mockDataProducer{ - trigger: trigger, - closeCh: make(chan struct{}), - meta: meta, - signers: signers, - feedIDs: feedIDs, - lggr: lggr, +func rawReportContext(reportCtx ocrTypes.ReportContext) []byte { + rc := evmutil.RawReportContext(reportCtx) + flat := []byte{} + for _, r := range rc { + flat = append(flat, r[:]...) } + return flat } -func (m *mockDataProducer) Start(ctx context.Context) error { - m.wg.Add(1) - go m.loop() - return nil -} - -func (m *mockDataProducer) loop() { - defer m.wg.Done() - +func (m *MockTriggerService) loop() { sleepSec := 15 ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) defer ticker.Stop() @@ -412,6 +168,7 @@ func (m *mockDataProducer) loop() { j := 0 for range ticker.C { + // TODO: properly close for i := range prices { prices[i] = prices[i] + 1 } @@ -424,49 +181,39 @@ func (m *mockDataProducer) loop() { reportCtx := ocrTypes.ReportContext{ReportTimestamp: ocrTypes.ReportTimestamp{Epoch: uint32(baseTimestamp + j)}} reports := []datastreams.FeedReport{} - for _, feedID := range m.feedIDs { - report := datastreams.FeedReport{ - FeedID: feedID, - FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), - ReportContext: rawReportContext(reportCtx), - ObservationTimestamp: timestamp, - } - // sign report with mock signers - sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) - hash := crypto.Keccak256(sigData) - for n := 0; n < m.meta.MinRequiredSignatures; n++ { - sig, err := crypto.Sign(hash, m.signers[n]) - if err != nil { - panic(err) + subscribers := map[string][]streams.FeedId{} + m.subscribersMu.Lock() + maps.Copy(subscribers, m.subscribers) + m.subscribersMu.Unlock() + for _, feedIDs := range subscribers { + for _, feedID := range feedIDs { + feedID := string(feedID) + report := datastreams.FeedReport{ + FeedID: feedID, + FullReport: newReport(m.lggr, common.HexToHash(feedID), big.NewInt(prices[0]), timestamp), + ReportContext: rawReportContext(reportCtx), + ObservationTimestamp: timestamp, + } + // sign report with mock signers + sigData := append(crypto.Keccak256(report.FullReport), report.ReportContext...) + hash := crypto.Keccak256(sigData) + for n := 0; n < m.meta.MinRequiredSignatures; n++ { + sig, err := crypto.Sign(hash, m.signers[n]) + if err != nil { + panic(err) + } + report.Signatures = append(report.Signatures, sig) } - report.Signatures = append(report.Signatures, sig) + + reports = append(reports, report) } - reports = append(reports, report) } m.lggr.Infow("New set of Mock reports", "timestamp", time.Now().Unix(), "payload", reports) - err := m.trigger.ProcessReport(reports) + err := m.MercuryTriggerService.ProcessReport(reports) if err != nil { m.lggr.Errorw("failed to process Mock reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) } } } - -func (m *mockDataProducer) Close() error { - close(m.closeCh) - m.wg.Wait() - return nil -} - -func (m *mockDataProducer) HealthReport() map[string]error { - return nil -} - -func (m *mockDataProducer) Ready() error { - return nil -} - -func (m *mockDataProducer) Name() string { - return "mockDataProducer" -}