From 79df493aa76f3c3849e7a1978d74e5b012242457 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 7 Feb 2024 10:32:37 -0500 Subject: [PATCH] Prep work for LLO - Includes minor changes/cleanups for unrelated code --- .tool-versions | 1 + .../testutils/configtest/general_config.go | 6 +++++- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 ++-- core/services/functions/connector_handler.go | 4 ++-- core/services/job/models.go | 8 ++++---- core/services/job/spawner.go | 10 +++++----- .../evmregistry/v21/logprovider/recoverer.go | 2 +- .../evmregistry/v21/mercury/streams/streams.go | 6 +++--- .../v21/mercury/streams/streams_test.go | 6 +++--- core/services/pipeline/common.go | 10 ++++++++++ .../relay/evm/functions/logpoller_wrapper.go | 4 ++-- .../evm/mercury/offchain_config_digester.go | 1 + core/services/relay/evm/mercury/transmitter.go | 2 +- .../services/relay/evm/mercury/v1/data_source.go | 2 +- .../relay/evm/mercury/wsrpc/pb/mercury.proto | 3 ++- .../evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go | 4 ++-- core/services/relay/evm/mercury_provider.go | 16 ++++++++-------- core/services/streams/delegate.go | 6 +++--- core/services/streams/delegate_test.go | 4 ++-- core/services/streams/stream.go | 7 +------ core/services/streams/stream_registry.go | 11 +++++++++-- core/services/vrf/v1/listener_v1.go | 2 +- core/web/presenters/job.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 ++-- 28 files changed, 77 insertions(+), 58 deletions(-) diff --git a/.tool-versions b/.tool-versions index d8f0afd901d..2a2362dcfa0 100644 --- a/.tool-versions +++ b/.tool-versions @@ -5,3 +5,4 @@ postgres 13.3 helm 3.10.3 zig 0.10.1 golangci-lint 1.55.2 +protoc 23.2 diff --git a/core/internal/testutils/configtest/general_config.go b/core/internal/testutils/configtest/general_config.go index d2851035855..c79b1c7c3cb 100644 --- a/core/internal/testutils/configtest/general_config.go +++ b/core/internal/testutils/configtest/general_config.go @@ -66,9 +66,13 @@ func overrides(c *chainlink.Config, s *chainlink.Secrets) { c.WebServer.TLS.ListenIP = &testIP chainID := big.NewI(evmclient.NullClientChainID) + + chainCfg := evmcfg.Defaults(chainID) + chainCfg.LogPollInterval = commonconfig.MustNewDuration(1 * time.Second) // speed it up from the standard 15s for tests + c.EVM = append(c.EVM, &evmcfg.EVMConfig{ ChainID: chainID, - Chain: evmcfg.Defaults(chainID), + Chain: chainCfg, Nodes: evmcfg.EVMNodes{ &evmcfg.Node{ Name: ptr("test"), diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 73307549f53..4a13766635c 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -313,7 +313,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.32.0 // indirect gopkg.in/guregu/null.v2 v2.1.2 // indirect gopkg.in/guregu/null.v4 v4.0.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 5c4ccc848d3..3b0e843d0d1 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1893,8 +1893,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/core/services/functions/connector_handler.go b/core/services/functions/connector_handler.go index c8c522e6a62..75a3dca24f1 100644 --- a/core/services/functions/connector_handler.go +++ b/core/services/functions/connector_handler.go @@ -271,7 +271,7 @@ func (h *functionsConnectorHandler) handleOffchainRequest(request *OffchainReque defer cancel() err := h.listener.HandleOffchainRequest(ctx, request) if err != nil { - h.lggr.Errorw("internal error while processing", "id", request.RequestId, "error", err) + h.lggr.Errorw("internal error while processing", "id", request.RequestId, "err", err) h.mu.Lock() defer h.mu.Unlock() state, ok := h.heartbeatRequests[RequestID(request.RequestId)] @@ -330,7 +330,7 @@ func (h *functionsConnectorHandler) cacheNewRequestLocked(requestId RequestID, r func (h *functionsConnectorHandler) sendResponseAndLog(ctx context.Context, gatewayId string, requestBody *api.MessageBody, payload any) { err := h.sendResponse(ctx, gatewayId, requestBody, payload) if err != nil { - h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "error", err) + h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "err", err) } else { h.lggr.Debugw("sent to gateway", "id", gatewayId, "messageId", requestBody.MessageId, "donId", requestBody.DonId, "method", requestBody.Method) } diff --git a/core/services/job/models.go b/core/services/job/models.go index cde1e7a740d..2aee9182a9c 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -126,7 +126,7 @@ var ( type Job struct { ID int32 `toml:"-"` ExternalJobID uuid.UUID `toml:"externalJobID"` - StreamID *uint64 `toml:"streamID"` + StreamID *uint32 `toml:"streamID"` OCROracleSpecID *int32 OCROracleSpec *OCROracleSpec OCR2OracleSpecID *int32 @@ -162,11 +162,11 @@ type Job struct { PipelineSpecID int32 PipelineSpec *pipeline.Spec JobSpecErrors []SpecError - Type Type - SchemaVersion uint32 + Type Type `toml:"type"` + SchemaVersion uint32 `toml:"schemaVersion"` GasLimit clnull.Uint32 `toml:"gasLimit"` ForwardingAllowed bool `toml:"forwardingAllowed"` - Name null.String + Name null.String `toml:"name"` MaxTaskDuration models.Interval Pipeline pipeline.Pipeline `toml:"observationSource"` CreatedAt time.Time diff --git a/core/services/job/spawner.go b/core/services/job/spawner.go index 1d44cedaad9..a16466fbef1 100644 --- a/core/services/job/spawner.go +++ b/core/services/job/spawner.go @@ -65,20 +65,20 @@ type ( Delegate interface { JobType() Type // BeforeJobCreated is only called once on first time job create. - BeforeJobCreated(spec Job) + BeforeJobCreated(Job) // ServicesForSpec returns services to be started and stopped for this // job. In case a given job type relies upon well-defined startup/shutdown // ordering for services, they are started in the order they are given // and stopped in reverse order. - ServicesForSpec(spec Job) ([]ServiceCtx, error) - AfterJobCreated(spec Job) - BeforeJobDeleted(spec Job) + ServicesForSpec(Job) ([]ServiceCtx, error) + AfterJobCreated(Job) + BeforeJobDeleted(Job) // OnDeleteJob will be called from within DELETE db transaction. Any db // commands issued within OnDeleteJob() should be performed first, before any // non-db side effects. This is required in order to guarantee mutual atomicity between // all tasks intended to happen during job deletion. For the same reason, the job will // not show up in the db within OnDeleteJob(), even though it is still actively running. - OnDeleteJob(spec Job, q pg.Queryer) error + OnDeleteJob(jb Job, q pg.Queryer) error } activeJob struct { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index b28ece9843f..13b8bb17245 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -542,7 +542,7 @@ func (r *logRecoverer) selectFilterBatch(filters []upkeepFilter) []upkeepFilter for len(results) < batchSize && len(filters) != 0 { i, err := r.randIntn(len(filters)) if err != nil { - r.lggr.Debugw("error generating random number", "error", err.Error()) + r.lggr.Debugw("error generating random number", "err", err.Error()) continue } results = append(results, filters[i]) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go index 48ee0492f9e..6f0bf98f83d 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams.go @@ -36,7 +36,7 @@ type latestBlockProvider interface { LatestBlock() *ocr2keepers.BlockKey } -type streamsRegistry interface { +type streamRegistry interface { GetUpkeepPrivilegeConfig(opts *bind.CallOpts, upkeepId *big.Int) ([]byte, error) CheckCallback(opts *bind.CallOpts, id *big.Int, values [][]byte, extraData []byte) (iregistry21.CheckCallback, error) Address() common.Address @@ -52,7 +52,7 @@ type streams struct { mercuryConfig mercury.MercuryConfigProvider abi abi.ABI blockSubscriber latestBlockProvider - registry streamsRegistry + registry streamRegistry client contextCaller lggr logger.Logger threadCtrl utils.ThreadControl @@ -70,7 +70,7 @@ func NewStreamsLookup( mercuryConfig mercury.MercuryConfigProvider, blockSubscriber latestBlockProvider, client contextCaller, - registry streamsRegistry, + registry streamRegistry, lggr logger.Logger) *streams { httpClient := http.DefaultClient threadCtrl := utils.NewThreadControl() diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go index c7bff2eac7a..531a97159f1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/mercury/streams/streams_test.go @@ -139,7 +139,7 @@ func TestStreams_CheckCallback(t *testing.T) { state encoding.PipelineExecutionState retryable bool - registry streamsRegistry + registry streamRegistry }{ { name: "success - empty extra data", @@ -284,7 +284,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) { err error state encoding.PipelineExecutionState reason encoding.UpkeepFailureReason - registry streamsRegistry + registry streamRegistry retryable bool config []byte }{ @@ -474,7 +474,7 @@ func TestStreams_StreamsLookup(t *testing.T) { hasError bool hasPermission bool v3 bool - registry streamsRegistry + registry streamRegistry }{ { name: "success - happy path no cache", diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index 6efa7aa2148..74e13ae200c 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -243,6 +243,16 @@ func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult { return fr } +// Terminals returns all terminal task run results +func (trrs TaskRunResults) Terminals() (terminals []TaskRunResult) { + for _, trr := range trrs { + if trr.IsTerminal() { + terminals = append(terminals, trr) + } + } + return +} + // GetNextTaskOf returns the task with the next id or nil if it does not exist func (trrs *TaskRunResults) GetNextTaskOf(task TaskRunResult) *TaskRunResult { nextID := task.Task.Base().id + 1 diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index e76b567b42b..7897e86310e 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -321,7 +321,7 @@ func (l *logPollerWrapper) SubscribeToUpdates(subscriberName string, subscriber if l.pluginConfig.ContractVersion == 0 { // in V0, immediately set contract address to Oracle contract and never update again if err := subscriber.UpdateRoutes(l.routerContract.Address(), l.routerContract.Address()); err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "error", err) + l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "err", err) } } else if l.pluginConfig.ContractVersion == 1 { l.mu.Lock() @@ -416,7 +416,7 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinator common.Address, p for _, subscriber := range l.subscribers { err := subscriber.UpdateRoutes(activeCoordinator, proposedCoordinator) if err != nil { - l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "error", err) + l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err) } } } diff --git a/core/services/relay/evm/mercury/offchain_config_digester.go b/core/services/relay/evm/mercury/offchain_config_digester.go index a12198738a9..f9ba0b23095 100644 --- a/core/services/relay/evm/mercury/offchain_config_digester.go +++ b/core/services/relay/evm/mercury/offchain_config_digester.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/wsrpc/credentials" diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 40a51b9d92d..e62f1fae0c7 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -241,7 +241,7 @@ func (mt *mercuryTransmitter) runDeleteQueueLoop() { case req := <-mt.deleteQueue: for { if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil { - mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req) + mt.lggr.Errorw("Failed to delete transmit request record", "err", err, "req", req) mt.transmitQueueDeleteErrorCount.Inc() select { case <-time.After(b.Duration()): diff --git a/core/services/relay/evm/mercury/v1/data_source.go b/core/services/relay/evm/mercury/v1/data_source.go index ce48ec6cf94..7f41bd1e36c 100644 --- a/core/services/relay/evm/mercury/v1/data_source.go +++ b/core/services/relay/evm/mercury/v1/data_source.go @@ -295,7 +295,7 @@ func (ds *datasource) setLatestBlocks(ctx context.Context, obs *v1types.Observat latestBlocks, err := ds.mercuryChainReader.LatestHeads(ctx, nBlocksObservation) if err != nil { - ds.lggr.Errorw("failed to read latest blocks", "error", err) + ds.lggr.Errorw("failed to read latest blocks", "err", err) return err } diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto b/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto index 6e5e49724cd..184b0572046 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury.proto @@ -10,7 +10,8 @@ service Mercury { } message TransmitRequest { - bytes payload = 1; + bytes payload = 1; + string reportFormat = 2; } message TransmitResponse { diff --git a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go index eaf42c8568f..23c78abf533 100644 --- a/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go +++ b/core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go @@ -1,17 +1,17 @@ // Code generated by protoc-gen-go-wsrpc. DO NOT EDIT. // versions: // - protoc-gen-go-wsrpc v0.0.1 -// - protoc v3.21.12 +// - protoc v4.23.2 package pb import ( context "context" - wsrpc "github.com/smartcontractkit/wsrpc" ) // MercuryClient is the client API for Mercury service. +// type MercuryClient interface { Transmit(ctx context.Context, in *TransmitRequest) (*TransmitResponse, error) LatestReport(ctx context.Context, in *LatestReportRequest) (*LatestReportResponse, error) diff --git a/core/services/relay/evm/mercury_provider.go b/core/services/relay/evm/mercury_provider.go index d9858ac64c3..9159a13590e 100644 --- a/core/services/relay/evm/mercury_provider.go +++ b/core/services/relay/evm/mercury_provider.go @@ -23,7 +23,7 @@ import ( var _ commontypes.MercuryProvider = (*mercuryProvider)(nil) type mercuryProvider struct { - configWatcher *configWatcher + cp commontypes.ConfigProvider chainReader commontypes.ChainReader codec commontypes.Codec transmitter evmmercury.Transmitter @@ -36,7 +36,7 @@ type mercuryProvider struct { } func NewMercuryProvider( - configWatcher *configWatcher, + cp commontypes.ConfigProvider, chainReader commontypes.ChainReader, codec commontypes.Codec, mercuryChainReader mercurytypes.ChainReader, @@ -47,7 +47,7 @@ func NewMercuryProvider( lggr logger.Logger, ) *mercuryProvider { return &mercuryProvider{ - configWatcher, + cp, chainReader, codec, transmitter, @@ -61,7 +61,7 @@ func NewMercuryProvider( } func (p *mercuryProvider) Start(ctx context.Context) error { - return p.ms.Start(ctx, p.configWatcher, p.transmitter) + return p.ms.Start(ctx, p.cp, p.transmitter) } func (p *mercuryProvider) Close() error { @@ -69,7 +69,7 @@ func (p *mercuryProvider) Close() error { } func (p *mercuryProvider) Ready() error { - return errors.Join(p.configWatcher.Ready(), p.transmitter.Ready()) + return errors.Join(p.cp.Ready(), p.transmitter.Ready()) } func (p *mercuryProvider) Name() string { @@ -78,7 +78,7 @@ func (p *mercuryProvider) Name() string { func (p *mercuryProvider) HealthReport() map[string]error { report := map[string]error{} - services.CopyHealth(report, p.configWatcher.HealthReport()) + services.CopyHealth(report, p.cp.HealthReport()) services.CopyHealth(report, p.transmitter.HealthReport()) return report } @@ -92,11 +92,11 @@ func (p *mercuryProvider) Codec() commontypes.Codec { } func (p *mercuryProvider) ContractConfigTracker() ocrtypes.ContractConfigTracker { - return p.configWatcher.ContractConfigTracker() + return p.cp.ContractConfigTracker() } func (p *mercuryProvider) OffchainConfigDigester() ocrtypes.OffchainConfigDigester { - return p.configWatcher.OffchainConfigDigester() + return p.cp.OffchainConfigDigester() } func (p *mercuryProvider) OnchainConfigCodec() mercurytypes.OnchainConfigCodec { diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index 4e83aed2b94..f7dc852a50b 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -48,7 +48,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e return nil, errors.New("streamID is required to be present for stream specs") } id := *jb.StreamID - lggr := d.lggr.With("streamID", id) + lggr := d.lggr.Named(fmt.Sprintf("%d", id)).With("streamID", id) rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) services = append(services, rrs, &StreamService{ @@ -77,12 +77,12 @@ func (s *StreamService) Start(_ context.Context) error { if s.spec == nil { return fmt.Errorf("pipeline spec unexpectedly missing for stream %q", s.id) } - s.lggr.Debugf("Starting stream %q", s.id) + s.lggr.Debugf("Starting stream %d", s.id) return s.registry.Register(s.id, *s.spec, s.rrs) } func (s *StreamService) Close() error { - s.lggr.Debugf("Stopping stream %q", s.id) + s.lggr.Debugf("Stopping stream %d", s.id) s.registry.Unregister(s.id) return nil } diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go index d6d1ca68876..e97da63d522 100644 --- a/core/services/streams/delegate_test.go +++ b/core/services/streams/delegate_test.go @@ -38,7 +38,7 @@ func Test_Delegate(t *testing.T) { _, err := d.ServicesForSpec(jb) assert.EqualError(t, err, "streamID is required to be present for stream specs") }) - jb.StreamID = ptr(uint64(42)) + jb.StreamID = ptr(uint32(42)) t.Run("returns services", func(t *testing.T) { srvs, err := d.ServicesForSpec(jb) require.NoError(t, err) @@ -83,7 +83,7 @@ answer1 [type=median index=0]; assert.Equal(t, uint32(1), jb.SchemaVersion) assert.True(t, jb.Name.Valid) require.NotNil(t, jb.StreamID) - assert.Equal(t, uint64(12345), *jb.StreamID) + assert.Equal(t, uint32(12345), *jb.StreamID) assert.Equal(t, "voter-turnout", jb.Name.String) }, }, diff --git a/core/services/streams/stream.go b/core/services/streams/stream.go index 51535a0cb86..cb168c11bce 100644 --- a/core/services/streams/stream.go +++ b/core/services/streams/stream.go @@ -101,14 +101,9 @@ func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRu // extract any desired type that matches a particular pipeline run output. // Returns error on parse errors: if results are wrong type func ExtractBigInt(trrs pipeline.TaskRunResults) (*big.Int, error) { - var finaltrrs []pipeline.TaskRunResult // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed // by the pipeline executor - for _, trr := range trrs { - if trr.IsTerminal() { - finaltrrs = append(finaltrrs, trr) - } - } + finaltrrs := trrs.Terminals() if len(finaltrrs) != 1 { return nil, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs)) diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 0c822010a53..c4795caa304 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -4,18 +4,25 @@ import ( "fmt" "sync" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) -type StreamID = uint64 +// alias for easier refactoring +type StreamID = commontypes.StreamID type Registry interface { - Get(streamID StreamID) (strm Stream, exists bool) + Getter Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error Unregister(streamID StreamID) } +type Getter interface { + Get(streamID StreamID) (strm Stream, exists bool) +} + type streamRegistry struct { sync.RWMutex lggr logger.Logger diff --git a/core/services/vrf/v1/listener_v1.go b/core/services/vrf/v1/listener_v1.go index f4e813d7d61..a3240365a66 100644 --- a/core/services/vrf/v1/listener_v1.go +++ b/core/services/vrf/v1/listener_v1.go @@ -369,7 +369,7 @@ func (lsn *Listener) handleLog(lb log.Broadcast, minConfs uint32) { func (lsn *Listener) shouldProcessLog(lb log.Broadcast) bool { consumed, err := lsn.Chain.LogBroadcaster().WasAlreadyConsumed(lb) if err != nil { - lsn.L.Errorw("Could not determine if log was already consumed", "error", err, "txHash", lb.RawLog().TxHash) + lsn.L.Errorw("Could not determine if log was already consumed", "err", err, "txHash", lb.RawLog().TxHash) // Do not process, let lb resend it as a retry mechanism. return false } diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index c97314495b2..6b0293665df 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -452,7 +452,7 @@ func NewJobError(e job.SpecError) JobError { type JobResource struct { JAID Name string `json:"name"` - StreamID *uint64 `json:"streamID,omitempty"` + StreamID *uint32 `json:"streamID,omitempty"` Type JobSpecType `json:"type"` SchemaVersion uint32 `json:"schemaVersion"` GasLimit clnull.Uint32 `json:"gasLimit"` diff --git a/go.mod b/go.mod index c3c6aeb505b..371517046fa 100644 --- a/go.mod +++ b/go.mod @@ -101,7 +101,7 @@ require ( golang.org/x/tools v0.16.0 gonum.org/v1/gonum v0.14.0 google.golang.org/grpc v1.59.0 - google.golang.org/protobuf v1.31.0 + google.golang.org/protobuf v1.32.0 gopkg.in/guregu/null.v2 v2.1.2 gopkg.in/guregu/null.v4 v4.0.0 gopkg.in/natefinch/lumberjack.v2 v2.2.1 diff --git a/go.sum b/go.sum index 35e1703f7ef..1b42754b441 100644 --- a/go.sum +++ b/go.sum @@ -1888,8 +1888,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 70b9e772e3b..c7688f89a92 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -449,7 +449,7 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.59.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.32.0 // indirect gopkg.in/guregu/null.v2 v2.1.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 82897ad46df..e300bf1abf2 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -2310,8 +2310,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= +google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=