Skip to content

Commit

Permalink
Prep work for LLO
Browse files Browse the repository at this point in the history
- Includes minor changes/cleanups for unrelated code
  • Loading branch information
samsondav committed Feb 7, 2024
1 parent f90e419 commit 79df493
Show file tree
Hide file tree
Showing 28 changed files with 77 additions and 58 deletions.
1 change: 1 addition & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ postgres 13.3
helm 3.10.3
zig 0.10.1
golangci-lint 1.55.2
protoc 23.2
6 changes: 5 additions & 1 deletion core/internal/testutils/configtest/general_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,13 @@ func overrides(c *chainlink.Config, s *chainlink.Secrets) {
c.WebServer.TLS.ListenIP = &testIP

chainID := big.NewI(evmclient.NullClientChainID)

chainCfg := evmcfg.Defaults(chainID)
chainCfg.LogPollInterval = commonconfig.MustNewDuration(1 * time.Second) // speed it up from the standard 15s for tests

c.EVM = append(c.EVM, &evmcfg.EVMConfig{
ChainID: chainID,
Chain: evmcfg.Defaults(chainID),
Chain: chainCfg,
Nodes: evmcfg.EVMNodes{
&evmcfg.Node{
Name: ptr("test"),
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/guregu/null.v2 v2.1.2 // indirect
gopkg.in/guregu/null.v4 v4.0.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1893,8 +1893,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions core/services/functions/connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (h *functionsConnectorHandler) handleOffchainRequest(request *OffchainReque
defer cancel()
err := h.listener.HandleOffchainRequest(ctx, request)
if err != nil {
h.lggr.Errorw("internal error while processing", "id", request.RequestId, "error", err)
h.lggr.Errorw("internal error while processing", "id", request.RequestId, "err", err)
h.mu.Lock()
defer h.mu.Unlock()
state, ok := h.heartbeatRequests[RequestID(request.RequestId)]
Expand Down Expand Up @@ -330,7 +330,7 @@ func (h *functionsConnectorHandler) cacheNewRequestLocked(requestId RequestID, r
func (h *functionsConnectorHandler) sendResponseAndLog(ctx context.Context, gatewayId string, requestBody *api.MessageBody, payload any) {
err := h.sendResponse(ctx, gatewayId, requestBody, payload)
if err != nil {
h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "error", err)
h.lggr.Errorw("failed to send response to gateway", "id", gatewayId, "err", err)
} else {
h.lggr.Debugw("sent to gateway", "id", gatewayId, "messageId", requestBody.MessageId, "donId", requestBody.DonId, "method", requestBody.Method)
}
Expand Down
8 changes: 4 additions & 4 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ var (
type Job struct {
ID int32 `toml:"-"`
ExternalJobID uuid.UUID `toml:"externalJobID"`
StreamID *uint64 `toml:"streamID"`
StreamID *uint32 `toml:"streamID"`
OCROracleSpecID *int32
OCROracleSpec *OCROracleSpec
OCR2OracleSpecID *int32
Expand Down Expand Up @@ -162,11 +162,11 @@ type Job struct {
PipelineSpecID int32
PipelineSpec *pipeline.Spec
JobSpecErrors []SpecError
Type Type
SchemaVersion uint32
Type Type `toml:"type"`
SchemaVersion uint32 `toml:"schemaVersion"`
GasLimit clnull.Uint32 `toml:"gasLimit"`
ForwardingAllowed bool `toml:"forwardingAllowed"`
Name null.String
Name null.String `toml:"name"`
MaxTaskDuration models.Interval
Pipeline pipeline.Pipeline `toml:"observationSource"`
CreatedAt time.Time
Expand Down
10 changes: 5 additions & 5 deletions core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ type (
Delegate interface {
JobType() Type
// BeforeJobCreated is only called once on first time job create.
BeforeJobCreated(spec Job)
BeforeJobCreated(Job)
// ServicesForSpec returns services to be started and stopped for this
// job. In case a given job type relies upon well-defined startup/shutdown
// ordering for services, they are started in the order they are given
// and stopped in reverse order.
ServicesForSpec(spec Job) ([]ServiceCtx, error)
AfterJobCreated(spec Job)
BeforeJobDeleted(spec Job)
ServicesForSpec(Job) ([]ServiceCtx, error)
AfterJobCreated(Job)
BeforeJobDeleted(Job)
// OnDeleteJob will be called from within DELETE db transaction. Any db
// commands issued within OnDeleteJob() should be performed first, before any
// non-db side effects. This is required in order to guarantee mutual atomicity between
// all tasks intended to happen during job deletion. For the same reason, the job will
// not show up in the db within OnDeleteJob(), even though it is still actively running.
OnDeleteJob(spec Job, q pg.Queryer) error
OnDeleteJob(jb Job, q pg.Queryer) error
}

activeJob struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (r *logRecoverer) selectFilterBatch(filters []upkeepFilter) []upkeepFilter
for len(results) < batchSize && len(filters) != 0 {
i, err := r.randIntn(len(filters))
if err != nil {
r.lggr.Debugw("error generating random number", "error", err.Error())
r.lggr.Debugw("error generating random number", "err", err.Error())
continue
}
results = append(results, filters[i])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type latestBlockProvider interface {
LatestBlock() *ocr2keepers.BlockKey
}

type streamsRegistry interface {
type streamRegistry interface {
GetUpkeepPrivilegeConfig(opts *bind.CallOpts, upkeepId *big.Int) ([]byte, error)
CheckCallback(opts *bind.CallOpts, id *big.Int, values [][]byte, extraData []byte) (iregistry21.CheckCallback, error)
Address() common.Address
Expand All @@ -52,7 +52,7 @@ type streams struct {
mercuryConfig mercury.MercuryConfigProvider
abi abi.ABI
blockSubscriber latestBlockProvider
registry streamsRegistry
registry streamRegistry
client contextCaller
lggr logger.Logger
threadCtrl utils.ThreadControl
Expand All @@ -70,7 +70,7 @@ func NewStreamsLookup(
mercuryConfig mercury.MercuryConfigProvider,
blockSubscriber latestBlockProvider,
client contextCaller,
registry streamsRegistry,
registry streamRegistry,
lggr logger.Logger) *streams {
httpClient := http.DefaultClient
threadCtrl := utils.NewThreadControl()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestStreams_CheckCallback(t *testing.T) {

state encoding.PipelineExecutionState
retryable bool
registry streamsRegistry
registry streamRegistry
}{
{
name: "success - empty extra data",
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestStreams_AllowedToUseMercury(t *testing.T) {
err error
state encoding.PipelineExecutionState
reason encoding.UpkeepFailureReason
registry streamsRegistry
registry streamRegistry
retryable bool
config []byte
}{
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestStreams_StreamsLookup(t *testing.T) {
hasError bool
hasPermission bool
v3 bool
registry streamsRegistry
registry streamRegistry
}{
{
name: "success - happy path no cache",
Expand Down
10 changes: 10 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
return fr
}

// Terminals returns all terminal task run results
func (trrs TaskRunResults) Terminals() (terminals []TaskRunResult) {
for _, trr := range trrs {
if trr.IsTerminal() {
terminals = append(terminals, trr)
}
}
return
}

// GetNextTaskOf returns the task with the next id or nil if it does not exist
func (trrs *TaskRunResults) GetNextTaskOf(task TaskRunResult) *TaskRunResult {
nextID := task.Task.Base().id + 1
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/functions/logpoller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (l *logPollerWrapper) SubscribeToUpdates(subscriberName string, subscriber
if l.pluginConfig.ContractVersion == 0 {
// in V0, immediately set contract address to Oracle contract and never update again
if err := subscriber.UpdateRoutes(l.routerContract.Address(), l.routerContract.Address()); err != nil {
l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "error", err)
l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "subscriberName", subscriberName, "err", err)
}
} else if l.pluginConfig.ContractVersion == 1 {
l.mu.Lock()
Expand Down Expand Up @@ -416,7 +416,7 @@ func (l *logPollerWrapper) handleRouteUpdate(activeCoordinator common.Address, p
for _, subscriber := range l.subscribers {
err := subscriber.UpdateRoutes(activeCoordinator, proposedCoordinator)
if err != nil {
l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "error", err)
l.lggr.Errorw("LogPollerWrapper: Failed to update routes", "err", err)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"

ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/smartcontractkit/wsrpc/credentials"

Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (mt *mercuryTransmitter) runDeleteQueueLoop() {
case req := <-mt.deleteQueue:
for {
if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil {
mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req)
mt.lggr.Errorw("Failed to delete transmit request record", "err", err, "req", req)
mt.transmitQueueDeleteErrorCount.Inc()
select {
case <-time.After(b.Duration()):
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/mercury/v1/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (ds *datasource) setLatestBlocks(ctx context.Context, obs *v1types.Observat
latestBlocks, err := ds.mercuryChainReader.LatestHeads(ctx, nBlocksObservation)

if err != nil {
ds.lggr.Errorw("failed to read latest blocks", "error", err)
ds.lggr.Errorw("failed to read latest blocks", "err", err)
return err
}

Expand Down
3 changes: 2 additions & 1 deletion core/services/relay/evm/mercury/wsrpc/pb/mercury.proto
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ service Mercury {
}

message TransmitRequest {
bytes payload = 1;
bytes payload = 1;
string reportFormat = 2;
}

message TransmitResponse {
Expand Down
4 changes: 2 additions & 2 deletions core/services/relay/evm/mercury/wsrpc/pb/mercury_wsrpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions core/services/relay/evm/mercury_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
var _ commontypes.MercuryProvider = (*mercuryProvider)(nil)

type mercuryProvider struct {
configWatcher *configWatcher
cp commontypes.ConfigProvider
chainReader commontypes.ChainReader
codec commontypes.Codec
transmitter evmmercury.Transmitter
Expand All @@ -36,7 +36,7 @@ type mercuryProvider struct {
}

func NewMercuryProvider(
configWatcher *configWatcher,
cp commontypes.ConfigProvider,
chainReader commontypes.ChainReader,
codec commontypes.Codec,
mercuryChainReader mercurytypes.ChainReader,
Expand All @@ -47,7 +47,7 @@ func NewMercuryProvider(
lggr logger.Logger,
) *mercuryProvider {
return &mercuryProvider{
configWatcher,
cp,
chainReader,
codec,
transmitter,
Expand All @@ -61,15 +61,15 @@ func NewMercuryProvider(
}

func (p *mercuryProvider) Start(ctx context.Context) error {
return p.ms.Start(ctx, p.configWatcher, p.transmitter)
return p.ms.Start(ctx, p.cp, p.transmitter)
}

func (p *mercuryProvider) Close() error {
return p.ms.Close()
}

func (p *mercuryProvider) Ready() error {
return errors.Join(p.configWatcher.Ready(), p.transmitter.Ready())
return errors.Join(p.cp.Ready(), p.transmitter.Ready())
}

func (p *mercuryProvider) Name() string {
Expand All @@ -78,7 +78,7 @@ func (p *mercuryProvider) Name() string {

func (p *mercuryProvider) HealthReport() map[string]error {
report := map[string]error{}
services.CopyHealth(report, p.configWatcher.HealthReport())
services.CopyHealth(report, p.cp.HealthReport())
services.CopyHealth(report, p.transmitter.HealthReport())
return report
}
Expand All @@ -92,11 +92,11 @@ func (p *mercuryProvider) Codec() commontypes.Codec {
}

func (p *mercuryProvider) ContractConfigTracker() ocrtypes.ContractConfigTracker {
return p.configWatcher.ContractConfigTracker()
return p.cp.ContractConfigTracker()
}

func (p *mercuryProvider) OffchainConfigDigester() ocrtypes.OffchainConfigDigester {
return p.configWatcher.OffchainConfigDigester()
return p.cp.OffchainConfigDigester()
}

func (p *mercuryProvider) OnchainConfigCodec() mercurytypes.OnchainConfigCodec {
Expand Down
6 changes: 3 additions & 3 deletions core/services/streams/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e
return nil, errors.New("streamID is required to be present for stream specs")
}
id := *jb.StreamID
lggr := d.lggr.With("streamID", id)
lggr := d.lggr.Named(fmt.Sprintf("%d", id)).With("streamID", id)

rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth())
services = append(services, rrs, &StreamService{
Expand Down Expand Up @@ -77,12 +77,12 @@ func (s *StreamService) Start(_ context.Context) error {
if s.spec == nil {
return fmt.Errorf("pipeline spec unexpectedly missing for stream %q", s.id)
}
s.lggr.Debugf("Starting stream %q", s.id)
s.lggr.Debugf("Starting stream %d", s.id)
return s.registry.Register(s.id, *s.spec, s.rrs)
}

func (s *StreamService) Close() error {
s.lggr.Debugf("Stopping stream %q", s.id)
s.lggr.Debugf("Stopping stream %d", s.id)
s.registry.Unregister(s.id)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions core/services/streams/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_Delegate(t *testing.T) {
_, err := d.ServicesForSpec(jb)
assert.EqualError(t, err, "streamID is required to be present for stream specs")
})
jb.StreamID = ptr(uint64(42))
jb.StreamID = ptr(uint32(42))
t.Run("returns services", func(t *testing.T) {
srvs, err := d.ServicesForSpec(jb)
require.NoError(t, err)
Expand Down Expand Up @@ -83,7 +83,7 @@ answer1 [type=median index=0];
assert.Equal(t, uint32(1), jb.SchemaVersion)
assert.True(t, jb.Name.Valid)
require.NotNil(t, jb.StreamID)
assert.Equal(t, uint64(12345), *jb.StreamID)
assert.Equal(t, uint32(12345), *jb.StreamID)
assert.Equal(t, "voter-turnout", jb.Name.String)
},
},
Expand Down
7 changes: 1 addition & 6 deletions core/services/streams/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,9 @@ func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRu
// extract any desired type that matches a particular pipeline run output.
// Returns error on parse errors: if results are wrong type
func ExtractBigInt(trrs pipeline.TaskRunResults) (*big.Int, error) {
var finaltrrs []pipeline.TaskRunResult
// pipeline.TaskRunResults comes ordered asc by index, this is guaranteed
// by the pipeline executor
for _, trr := range trrs {
if trr.IsTerminal() {
finaltrrs = append(finaltrrs, trr)
}
}
finaltrrs := trrs.Terminals()

if len(finaltrrs) != 1 {
return nil, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs))
Expand Down
Loading

0 comments on commit 79df493

Please sign in to comment.