From 4b67bae520cff85e3e73c4c610f4263195174e97 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 4 Jan 2024 13:17:13 -0500 Subject: [PATCH 1/5] Implement stream specs --- core/services/chainlink/application.go | 7 + core/services/job/models.go | 66 +++---- core/services/job/orm.go | 2 + core/services/job/validate.go | 17 +- core/services/ocrcommon/run_saver.go | 9 +- core/services/pipeline/common.go | 17 +- core/services/pipeline/models.go | 11 +- core/services/pipeline/runner.go | 62 ++++--- core/services/pipeline/runner_test.go | 40 +++++ core/services/streams/delegate.go | 110 ++++++++++++ core/services/streams/delegate_test.go | 161 ++++++++++++++++++ core/services/streams/stream.go | 133 +++++++++++++++ core/services/streams/stream_registry.go | 64 +++++++ core/services/streams/stream_registry_test.go | 107 ++++++++++++ core/services/streams/stream_test.go | 133 +++++++++++++++ core/services/vrf/delegate.go | 2 +- .../migrate/migrations/0216_stream_specs.sql | 40 +++++ core/testdata/testspecs/v2_specs.go | 31 ++++ core/web/jobs_controller.go | 3 + core/web/jobs_controller_test.go | 20 +++ docs/CHANGELOG.md | 1 + 21 files changed, 957 insertions(+), 79 deletions(-) create mode 100644 core/services/streams/delegate.go create mode 100644 core/services/streams/delegate_test.go create mode 100644 core/services/streams/stream.go create mode 100644 core/services/streams/stream_registry.go create mode 100644 core/services/streams/stream_registry_test.go create mode 100644 core/services/streams/stream_test.go create mode 100644 core/store/migrate/migrations/0216_stream_specs.sql diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index fae938c0db6..5c350a8d146 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -53,6 +53,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/core/services/vrf" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" @@ -290,6 +291,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) + streamRegistry = streams.NewStreamRegistry(globalLogger, pipelineRunner) ) for _, chain := range legacyEVMChains.Slice() { @@ -344,6 +346,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) { db, cfg.Database(), globalLogger), + job.Stream: streams.NewDelegate( + globalLogger, + streamRegistry, + pipelineRunner, + cfg.JobPipeline()), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) diff --git a/core/services/job/models.go b/core/services/job/models.go index ab9490eee9a..5dcf4928e35 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -33,20 +33,21 @@ import ( ) const ( + BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType) + BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType) + Bootstrap Type = (Type)(pipeline.BootstrapJobType) Cron Type = (Type)(pipeline.CronJobType) DirectRequest Type = (Type)(pipeline.DirectRequestJobType) FluxMonitor Type = (Type)(pipeline.FluxMonitorJobType) - OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType) - OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType) + Gateway Type = (Type)(pipeline.GatewayJobType) Keeper Type = (Type)(pipeline.KeeperJobType) - VRF Type = (Type)(pipeline.VRFJobType) - BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType) - BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType) LegacyGasStationServer Type = (Type)(pipeline.LegacyGasStationServerJobType) LegacyGasStationSidecar Type = (Type)(pipeline.LegacyGasStationSidecarJobType) + OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType) + OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType) + Stream Type = (Type)(pipeline.StreamJobType) + VRF Type = (Type)(pipeline.VRFJobType) Webhook Type = (Type)(pipeline.WebhookJobType) - Bootstrap Type = (Type)(pipeline.BootstrapJobType) - Gateway Type = (Type)(pipeline.GatewayJobType) ) //revive:disable:redefines-builtin-id @@ -70,52 +71,55 @@ func (t Type) SchemaVersion() uint32 { var ( requiresPipelineSpec = map[Type]bool{ + BlockHeaderFeeder: false, + BlockhashStore: false, + Bootstrap: false, Cron: true, DirectRequest: true, FluxMonitor: true, - OffchainReporting: false, // bootstrap jobs do not require it - OffchainReporting2: false, // bootstrap jobs do not require it + Gateway: false, Keeper: false, // observationSource is injected in the upkeep executor - VRF: true, - Webhook: true, - BlockhashStore: false, - BlockHeaderFeeder: false, LegacyGasStationServer: false, LegacyGasStationSidecar: false, - Bootstrap: false, - Gateway: false, + OffchainReporting2: false, // bootstrap jobs do not require it + OffchainReporting: false, // bootstrap jobs do not require it + Stream: true, + VRF: true, + Webhook: true, } supportsAsync = map[Type]bool{ + BlockHeaderFeeder: false, + BlockhashStore: false, + Bootstrap: false, Cron: true, DirectRequest: true, FluxMonitor: false, - OffchainReporting: false, - OffchainReporting2: false, + Gateway: false, Keeper: true, - VRF: true, - Webhook: true, - BlockhashStore: false, - BlockHeaderFeeder: false, LegacyGasStationServer: false, LegacyGasStationSidecar: false, - Bootstrap: false, - Gateway: false, + OffchainReporting2: false, + OffchainReporting: false, + Stream: true, + VRF: true, + Webhook: true, } schemaVersions = map[Type]uint32{ + BlockHeaderFeeder: 1, + BlockhashStore: 1, + Bootstrap: 1, Cron: 1, DirectRequest: 1, FluxMonitor: 1, - OffchainReporting: 1, - OffchainReporting2: 1, + Gateway: 1, Keeper: 1, - VRF: 1, - Webhook: 1, - BlockhashStore: 1, - BlockHeaderFeeder: 1, LegacyGasStationServer: 1, LegacyGasStationSidecar: 1, - Bootstrap: 1, - Gateway: 1, + OffchainReporting2: 1, + OffchainReporting: 1, + Stream: 1, + VRF: 1, + Webhook: 1, } ) diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 6c5a879ebd0..fb52dafdf5d 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -440,6 +440,8 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error { return errors.Wrap(err, "failed to create GatewaySpec for jobSpec") } jb.GatewaySpecID = &specID + case Stream: + // 'stream' type has no associated spec, nothing to do here default: o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type) } diff --git a/core/services/job/validate.go b/core/services/job/validate.go index b7a1dca3616..f108031f72e 100644 --- a/core/services/job/validate.go +++ b/core/services/job/validate.go @@ -12,20 +12,21 @@ var ( ErrInvalidJobType = errors.New("invalid job type") ErrInvalidSchemaVersion = errors.New("invalid schema version") jobTypes = map[Type]struct{}{ + BlockHeaderFeeder: {}, + BlockhashStore: {}, + Bootstrap: {}, Cron: {}, DirectRequest: {}, FluxMonitor: {}, - OffchainReporting: {}, - OffchainReporting2: {}, - Keeper: {}, - VRF: {}, - Webhook: {}, - BlockhashStore: {}, - Bootstrap: {}, - BlockHeaderFeeder: {}, Gateway: {}, + Keeper: {}, LegacyGasStationServer: {}, LegacyGasStationSidecar: {}, + OffchainReporting2: {}, + OffchainReporting: {}, + Stream: {}, + VRF: {}, + Webhook: {}, } ) diff --git a/core/services/ocrcommon/run_saver.go b/core/services/ocrcommon/run_saver.go index b1a0fc7b141..6d85aa857a4 100644 --- a/core/services/ocrcommon/run_saver.go +++ b/core/services/ocrcommon/run_saver.go @@ -5,15 +5,20 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) +type Runner interface { + InsertFinishedRun(run *pipeline.Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error +} + type RunResultSaver struct { services.StateMachine maxSuccessfulRuns uint64 runResults chan *pipeline.Run - pipelineRunner pipeline.Runner + pipelineRunner Runner done chan struct{} logger logger.Logger } @@ -24,7 +29,7 @@ func (r *RunResultSaver) HealthReport() map[string]error { func (r *RunResultSaver) Name() string { return r.logger.Name() } -func NewResultRunSaver(pipelineRunner pipeline.Runner, +func NewResultRunSaver(pipelineRunner Runner, logger logger.Logger, maxSuccessfulRuns uint64, resultsWriteDepth uint64, ) *RunResultSaver { return &RunResultSaver{ diff --git a/core/services/pipeline/common.go b/core/services/pipeline/common.go index ae82e5b3b81..6efa7aa2148 100644 --- a/core/services/pipeline/common.go +++ b/core/services/pipeline/common.go @@ -29,20 +29,21 @@ import ( ) const ( + BlockHeaderFeederJobType string = "blockheaderfeeder" + BlockhashStoreJobType string = "blockhashstore" + BootstrapJobType string = "bootstrap" CronJobType string = "cron" DirectRequestJobType string = "directrequest" FluxMonitorJobType string = "fluxmonitor" - OffchainReportingJobType string = "offchainreporting" - OffchainReporting2JobType string = "offchainreporting2" - KeeperJobType string = "keeper" - VRFJobType string = "vrf" - BlockhashStoreJobType string = "blockhashstore" - BlockHeaderFeederJobType string = "blockheaderfeeder" - WebhookJobType string = "webhook" - BootstrapJobType string = "bootstrap" GatewayJobType string = "gateway" + KeeperJobType string = "keeper" LegacyGasStationServerJobType string = "legacygasstationserver" LegacyGasStationSidecarJobType string = "legacygasstationsidecar" + OffchainReporting2JobType string = "offchainreporting2" + OffchainReportingJobType string = "offchainreporting" + StreamJobType string = "stream" + VRFJobType string = "vrf" + WebhookJobType string = "webhook" ) //go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore diff --git a/core/services/pipeline/models.go b/core/services/pipeline/models.go index cfabae82772..e198c1b788c 100644 --- a/core/services/pipeline/models.go +++ b/core/services/pipeline/models.go @@ -28,9 +28,18 @@ type Spec struct { JobID int32 `json:"-"` JobName string `json:"-"` JobType string `json:"-"` + + Pipeline *Pipeline `json:"-" db:"-"` // This may be nil, or may be populated manually as a cache. There is no locking on this, so be careful +} + +func (s *Spec) GetOrParsePipeline() (*Pipeline, error) { + if s.Pipeline != nil { + return s.Pipeline, nil + } + return s.ParsePipeline() } -func (s Spec) Pipeline() (*Pipeline, error) { +func (s *Spec) ParsePipeline() (*Pipeline, error) { return Parse(s.DotDagSource) } diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 3e5d77db5f2..a432d9fec11 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -243,26 +243,32 @@ func (r *runner) ExecuteRun( }) defer cancel() - run := NewRun(spec, vars) - - pipeline, err := r.initializePipeline(run) - if err != nil { - return run, nil, err + var pipeline *Pipeline + if spec.Pipeline != nil { + // assume if set that it has been pre-initialized + pipeline = spec.Pipeline + } else { + var err error + pipeline, err = r.InitializePipeline(spec) + if err != nil { + return nil, nil, err + } } + run := NewRun(spec, vars) taskRunResults := r.run(ctx, pipeline, run, vars, l) if run.Pending { - return run, nil, pkgerrors.Wrapf(err, "unexpected async run for spec ID %v, tried executing via ExecuteAndInsertFinishedRun", spec.ID) + return run, nil, fmt.Errorf("unexpected async run for spec ID %v, tried executing via ExecuteRun", spec.ID) } return run, taskRunResults, nil } -func (r *runner) initializePipeline(run *Run) (*Pipeline, error) { - pipeline, err := Parse(run.PipelineSpec.DotDagSource) +func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) { + pipeline, err = spec.GetOrParsePipeline() if err != nil { - return nil, err + return } // initialize certain task params @@ -278,7 +284,7 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) { task.(*BridgeTask).config = r.config task.(*BridgeTask).bridgeConfig = r.bridgeConfig task.(*BridgeTask).orm = r.btORM - task.(*BridgeTask).specId = run.PipelineSpec.ID + task.(*BridgeTask).specId = spec.ID // URL is "safe" because it comes from the node's own database. We // must use the unrestrictedHTTPClient because some node operators // may run external adapters on their own hardware @@ -286,8 +292,8 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) { case TaskTypeETHCall: task.(*ETHCallTask).legacyChains = r.legacyEVMChains task.(*ETHCallTask).config = r.config - task.(*ETHCallTask).specGasLimit = run.PipelineSpec.GasLimit - task.(*ETHCallTask).jobType = run.PipelineSpec.JobType + task.(*ETHCallTask).specGasLimit = spec.GasLimit + task.(*ETHCallTask).jobType = spec.JobType case TaskTypeVRF: task.(*VRFTask).keyStore = r.vrfKeyStore case TaskTypeVRFV2: @@ -296,28 +302,18 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) { task.(*VRFTaskV2Plus).keyStore = r.vrfKeyStore case TaskTypeEstimateGasLimit: task.(*EstimateGasLimitTask).legacyChains = r.legacyEVMChains - task.(*EstimateGasLimitTask).specGasLimit = run.PipelineSpec.GasLimit - task.(*EstimateGasLimitTask).jobType = run.PipelineSpec.JobType + task.(*EstimateGasLimitTask).specGasLimit = spec.GasLimit + task.(*EstimateGasLimitTask).jobType = spec.JobType case TaskTypeETHTx: task.(*ETHTxTask).keyStore = r.ethKeyStore task.(*ETHTxTask).legacyChains = r.legacyEVMChains - task.(*ETHTxTask).specGasLimit = run.PipelineSpec.GasLimit - task.(*ETHTxTask).jobType = run.PipelineSpec.JobType - task.(*ETHTxTask).forwardingAllowed = run.PipelineSpec.ForwardingAllowed + task.(*ETHTxTask).specGasLimit = spec.GasLimit + task.(*ETHTxTask).jobType = spec.JobType + task.(*ETHTxTask).forwardingAllowed = spec.ForwardingAllowed default: } } - // retain old UUID values - for _, taskRun := range run.PipelineTaskRuns { - task := pipeline.ByDotID(taskRun.DotID) - if task != nil && task.Base() != nil { - task.Base().uuid = taskRun.ID - } else { - return nil, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID) - } - } - return pipeline, nil } @@ -542,11 +538,21 @@ func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, var } func (r *runner) Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error) { - pipeline, err := r.initializePipeline(run) + pipeline, err := r.InitializePipeline(run.PipelineSpec) if err != nil { return false, err } + // retain old UUID values + for _, taskRun := range run.PipelineTaskRuns { + task := pipeline.ByDotID(taskRun.DotID) + if task != nil && task.Base() != nil { + task.Base().uuid = taskRun.ID + } else { + return false, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID) + } + } + preinsert := pipeline.RequiresPreInsert() q := r.orm.GetQ().WithOpts(pg.WithParentCtx(ctx)) diff --git a/core/services/pipeline/runner_test.go b/core/services/pipeline/runner_test.go index 695590e7bd0..5b4aaef7e88 100644 --- a/core/services/pipeline/runner_test.go +++ b/core/services/pipeline/runner_test.go @@ -942,3 +942,43 @@ en->de require.NoError(t, err) assert.Equal(t, inputBytes, result.Value) } + +func Test_PipelineRunner_ExecuteRun(t *testing.T) { + t.Run("uses cached *Pipeline if available", func(t *testing.T) { + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewTestGeneralConfig(t) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + relayExtenders := evmtest.NewChainRelayExtenders(t, evmtest.TestChainOpts{DB: db, GeneralConfig: cfg, KeyStore: ethKeyStore}) + legacyChains := evmrelay.NewLegacyChainsFromRelayerExtenders(relayExtenders) + lggr := logger.TestLogger(t) + r := pipeline.NewRunner(nil, nil, cfg.JobPipeline(), cfg.WebServer(), legacyChains, ethKeyStore, nil, lggr, nil, nil) + + template := ` +succeed [type=memo value=%d] +succeed; +` + + spec := pipeline.Spec{DotDagSource: fmt.Sprintf(template, 1)} + vars := pipeline.NewVarsFrom(nil) + + _, trrs, err := r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + require.NoError(t, err) + require.Len(t, trrs, 1) + assert.Equal(t, "1", trrs[0].Result.Value.(pipeline.ObjectParam).DecimalValue.Decimal().String()) + + // does not automatically cache + require.Nil(t, spec.Pipeline) + + // initialize it + spec.Pipeline, err = spec.ParsePipeline() + require.NoError(t, err) + + // even though this is set to 2, it should use the cached version + spec.DotDagSource = fmt.Sprintf(template, 2) + + _, trrs, err = r.ExecuteRun(testutils.Context(t), spec, vars, lggr) + require.NoError(t, err) + require.Len(t, trrs, 1) + assert.Equal(t, "1", trrs[0].Result.Value.(pipeline.ObjectParam).DecimalValue.Decimal().String()) + }) +} diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go new file mode 100644 index 00000000000..31352ecbdc4 --- /dev/null +++ b/core/services/streams/delegate.go @@ -0,0 +1,110 @@ +package streams + +import ( + "context" + "fmt" + "strings" + + "github.com/google/uuid" + "github.com/pelletier/go-toml/v2" + "github.com/pkg/errors" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +type DelegateConfig interface { + MaxSuccessfulRuns() uint64 + ResultWriteQueueDepth() uint64 +} + +type Delegate struct { + lggr logger.Logger + registry StreamRegistry + runner ocrcommon.Runner + cfg DelegateConfig +} + +var _ job.Delegate = (*Delegate)(nil) + +func NewDelegate(lggr logger.Logger, registry StreamRegistry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { + return &Delegate{lggr, registry, runner, cfg} +} + +func (d *Delegate) JobType() job.Type { + return job.Stream +} + +func (d *Delegate) BeforeJobCreated(jb job.Job) {} +func (d *Delegate) AfterJobCreated(jb job.Job) {} +func (d *Delegate) BeforeJobDeleted(jb job.Job) {} +func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { return nil } + +func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err error) { + if !jb.Name.Valid { + return nil, errors.New("job name is required to be present for stream specs") + } + id := StreamID(jb.Name.String) + lggr := d.lggr.Named(id.String()).With("streamID", id) + + rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) + services = append(services, rrs, &StreamService{ + d.registry, + id, + jb.PipelineSpec, + lggr, + rrs, + }) + return services, nil +} + +type ResultRunSaver interface { + Save(run *pipeline.Run) +} + +type StreamService struct { + registry StreamRegistry + id StreamID + spec *pipeline.Spec + lggr logger.Logger + rrs ResultRunSaver +} + +func (s *StreamService) Start(_ context.Context) error { + if s.spec == nil { + return fmt.Errorf("pipeline spec unexpectedly missing for stream %q", s.id) + } + s.lggr.Debugf("Starting stream %q", s.id) + return s.registry.Register(s.id, *s.spec, s.rrs) +} + +func (s *StreamService) Close() error { + s.lggr.Debugf("Stopping stream %q", s.id) + s.registry.Unregister(s.id) + return nil +} + +func ValidatedStreamSpec(tomlString string) (job.Job, error) { + var jb = job.Job{ExternalJobID: uuid.New()} + + r := strings.NewReader(tomlString) + d := toml.NewDecoder(r) + d.DisallowUnknownFields() + err := d.Decode(&jb) + if err != nil { + return jb, errors.Wrap(err, "toml unmarshal error on job") + } + + if jb.Type != job.Stream { + return jb, errors.Errorf("unsupported type: %q", jb.Type) + } + + if !jb.Name.Valid { + return jb, errors.New("jobs of type 'stream' require a non-blank name as stream ID") + } + + return jb, nil +} diff --git a/core/services/streams/delegate_test.go b/core/services/streams/delegate_test.go new file mode 100644 index 00000000000..77b10260375 --- /dev/null +++ b/core/services/streams/delegate_test.go @@ -0,0 +1,161 @@ +package streams + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/guregu/null.v4" +) + +type mockRegistry struct{} + +func (m *mockRegistry) Get(streamID StreamID) (strm Stream, exists bool) { return } +func (m *mockRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error { + return nil +} +func (m *mockRegistry) Unregister(streamID StreamID) {} + +type mockDelegateConfig struct{} + +func (m *mockDelegateConfig) MaxSuccessfulRuns() uint64 { return 0 } +func (m *mockDelegateConfig) ResultWriteQueueDepth() uint64 { return 0 } + +func Test_Delegate(t *testing.T) { + lggr := logger.TestLogger(t) + registry := &mockRegistry{} + runner := &mockRunner{} + cfg := &mockDelegateConfig{} + d := NewDelegate(lggr, registry, runner, cfg) + + t.Run("ServicesForSpec", func(t *testing.T) { + jb := job.Job{PipelineSpec: &pipeline.Spec{ID: 1}} + t.Run("errors if job is missing name", func(t *testing.T) { + _, err := d.ServicesForSpec(jb) + assert.EqualError(t, err, "job name is required to be present for stream specs") + }) + jb.Name = null.StringFrom("jobname") + t.Run("returns services", func(t *testing.T) { + srvs, err := d.ServicesForSpec(jb) + require.NoError(t, err) + + assert.Len(t, srvs, 2) + assert.IsType(t, &ocrcommon.RunResultSaver{}, srvs[0]) + + strmSrv := srvs[1].(*StreamService) + assert.Equal(t, registry, strmSrv.registry) + assert.Equal(t, StreamID("jobname"), strmSrv.id) + assert.Equal(t, jb.PipelineSpec, strmSrv.spec) + assert.NotNil(t, strmSrv.lggr) + assert.Equal(t, srvs[0], strmSrv.rrs) + }) + }) +} + +func Test_ValidatedStreamSpec(t *testing.T) { + var tt = []struct { + name string + toml string + assertion func(t *testing.T, os job.Job, err error) + }{ + { + name: "minimal stream spec", + toml: ` +type = "stream" +name = "voter-turnout" +schemaVersion = 1 +observationSource = """ +ds1 [type=bridge name=voter_turnout]; +ds1_parse [type=jsonparse path="one,two"]; +ds1_multiply [type=multiply times=1.23]; +ds1 -> ds1_parse -> ds1_multiply -> answer1; +answer1 [type=median index=0]; +""" +`, + assertion: func(t *testing.T, jb job.Job, err error) { + require.NoError(t, err) + assert.Equal(t, job.Type("stream"), jb.Type) + assert.Equal(t, uint32(1), jb.SchemaVersion) + assert.True(t, jb.Name.Valid) + assert.Equal(t, "voter-turnout", jb.Name.String) + }, + }, + { + name: "unparseable toml", + toml: `not toml`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "toml unmarshal error on job: toml: expected character =") + }, + }, + { + name: "invalid field type", + toml: ` +type = "stream" +name = "voter-turnout" +schemaVersion = "should be integer" +`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "toml unmarshal error on job: toml: cannot decode TOML string into struct field job.Job.SchemaVersion of type uint32") + }, + }, + { + name: "invalid fields", + toml: ` +type = "stream" +name = "voter-turnout" +notAValidField = "some value" +schemaVersion = 1 +`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "toml unmarshal error on job: strict mode: fields in the document are missing in the target struct") + }, + }, + { + name: "wrong type", + toml: ` +type = "not a valid type" +name = "voter-turnout" +schemaVersion = 1 +observationSource = """ +ds1 [type=bridge name=voter_turnout]; +ds1_parse [type=jsonparse path="one,two"]; +ds1_multiply [type=multiply times=1.23]; +ds1 -> ds1_parse -> ds1_multiply -> answer1; +answer1 [type=median index=0]; +""" +`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "unsupported type: \"not a valid type\"") + }, + }, + { + name: "error if missing name", + toml: ` +type = "stream" +schemaVersion = 1 +observationSource = """ +ds1 [type=bridge name=voter_turnout]; +ds1_parse [type=jsonparse path="one,two"]; +ds1_multiply [type=multiply times=1.23]; +ds1 -> ds1_parse -> ds1_multiply -> answer1; +answer1 [type=median index=0]; +""" +`, + assertion: func(t *testing.T, jb job.Job, err error) { + assert.EqualError(t, err, "jobs of type 'stream' require a non-blank name as stream ID") + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s, err := ValidatedStreamSpec(tc.toml) + tc.assertion(t, s, err) + }) + } +} diff --git a/core/services/streams/stream.go b/core/services/streams/stream.go new file mode 100644 index 00000000000..51535a0cb86 --- /dev/null +++ b/core/services/streams/stream.go @@ -0,0 +1,133 @@ +package streams + +import ( + "context" + "fmt" + "math/big" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/smartcontractkit/chainlink/v2/core/utils" +) + +type Runner interface { + ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) + InitializePipeline(spec pipeline.Spec) (*pipeline.Pipeline, error) +} + +type RunResultSaver interface { + Save(run *pipeline.Run) +} + +type Stream interface { + Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) +} + +type stream struct { + sync.RWMutex + id StreamID + lggr logger.Logger + spec *pipeline.Spec + runner Runner + rrs RunResultSaver +} + +func NewStream(lggr logger.Logger, id StreamID, spec pipeline.Spec, runner Runner, rrs RunResultSaver) Stream { + return newStream(lggr, id, spec, runner, rrs) +} + +func newStream(lggr logger.Logger, id StreamID, spec pipeline.Spec, runner Runner, rrs RunResultSaver) *stream { + return &stream{sync.RWMutex{}, id, lggr.Named("Stream").With("streamID", id), &spec, runner, rrs} +} + +func (s *stream) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { + run, trrs, err = s.executeRun(ctx) + + if err != nil { + return nil, nil, fmt.Errorf("Run failed: %w", err) + } + if s.rrs != nil { + s.rrs.Save(run) + } + + return +} + +// The context passed in here has a timeout of (ObservationTimeout + ObservationGracePeriod). +// Upon context cancellation, its expected that we return any usable values within ObservationGracePeriod. +func (s *stream) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + // the hot path here is to avoid parsing and use the pre-parsed, cached, pipeline + s.RLock() + initialize := s.spec.Pipeline == nil + s.RUnlock() + if initialize { + pipeline, err := s.spec.ParsePipeline() + if err != nil { + return nil, nil, fmt.Errorf("Run failed due to unparseable pipeline: %w", err) + } + + s.Lock() + if s.spec.Pipeline == nil { + s.spec.Pipeline = pipeline + // initialize it for the given runner + if _, err := s.runner.InitializePipeline(*s.spec); err != nil { + return nil, nil, fmt.Errorf("Run failed due to error while initializing pipeline: %w", err) + } + } + s.Unlock() + } + + vars := pipeline.NewVarsFrom(map[string]interface{}{ + "pipelineSpec": map[string]interface{}{ + "id": s.spec.ID, + }, + "stream": map[string]interface{}{ + "id": s.id, + }, + }) + + run, trrs, err := s.runner.ExecuteRun(ctx, *s.spec, vars, s.lggr) + if err != nil { + return nil, nil, fmt.Errorf("error executing run for spec ID %v: %w", s.spec.ID, err) + } + + return run, trrs, err +} + +// ExtractBigInt returns a result of a pipeline run that returns one single +// decimal result, as a *big.Int. +// This acts as a reference/example method, other methods can be implemented to +// extract any desired type that matches a particular pipeline run output. +// Returns error on parse errors: if results are wrong type +func ExtractBigInt(trrs pipeline.TaskRunResults) (*big.Int, error) { + var finaltrrs []pipeline.TaskRunResult + // pipeline.TaskRunResults comes ordered asc by index, this is guaranteed + // by the pipeline executor + for _, trr := range trrs { + if trr.IsTerminal() { + finaltrrs = append(finaltrrs, trr) + } + } + + if len(finaltrrs) != 1 { + return nil, fmt.Errorf("invalid number of results, expected: 1, got: %d", len(finaltrrs)) + } + res := finaltrrs[0].Result + if res.Error != nil { + return nil, res.Error + } + val, err := toBigInt(res.Value) + if err != nil { + return nil, fmt.Errorf("failed to parse BenchmarkPrice: %w", err) + } + return val, nil +} + +func toBigInt(val interface{}) (*big.Int, error) { + dec, err := utils.ToDecimal(val) + if err != nil { + return nil, err + } + return dec.BigInt(), nil +} diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go new file mode 100644 index 00000000000..086a2f2c7bc --- /dev/null +++ b/core/services/streams/stream_registry.go @@ -0,0 +1,64 @@ +package streams + +import ( + "fmt" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +type StreamID string + +func (s StreamID) String() string { + return string(s) +} + +type StreamRegistry interface { + Get(streamID StreamID) (strm Stream, exists bool) + Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error + Unregister(streamID StreamID) +} + +type streamRegistry struct { + sync.RWMutex + lggr logger.Logger + runner Runner + streams map[StreamID]Stream +} + +func NewStreamRegistry(lggr logger.Logger, runner Runner) StreamRegistry { + return newStreamRegistry(lggr, runner) +} + +func newStreamRegistry(lggr logger.Logger, runner Runner) *streamRegistry { + return &streamRegistry{ + sync.RWMutex{}, + lggr.Named("StreamRegistry"), + runner, + make(map[StreamID]Stream), + } +} + +func (s *streamRegistry) Get(streamID StreamID) (strm Stream, exists bool) { + s.RLock() + defer s.RUnlock() + strm, exists = s.streams[streamID] + return +} + +func (s *streamRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error { + s.Lock() + defer s.Unlock() + if _, exists := s.streams[streamID]; exists { + return fmt.Errorf("stream already registered for id: %q", streamID) + } + s.streams[streamID] = NewStream(s.lggr, streamID, spec, s.runner, rrs) + return nil +} + +func (s *streamRegistry) Unregister(streamID StreamID) { + s.Lock() + defer s.Unlock() + delete(s.streams, streamID) +} diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go new file mode 100644 index 00000000000..6b28ea41722 --- /dev/null +++ b/core/services/streams/stream_registry_test.go @@ -0,0 +1,107 @@ +package streams + +import ( + "context" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" + "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" +) + +type mockStream struct { + run *pipeline.Run + trrs pipeline.TaskRunResults + err error +} + +func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) { + return m.run, m.trrs, m.err +} + +func Test_StreamRegistry(t *testing.T) { + lggr := logger.TestLogger(t) + runner := &mockRunner{} + + t.Run("Get", func(t *testing.T) { + sr := newStreamRegistry(lggr, runner) + + sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} + sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} + sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}} + + v, exists := sr.Get("foo") + assert.True(t, exists) + assert.Equal(t, sr.streams["foo"], v) + + v, exists = sr.Get("bar") + assert.True(t, exists) + assert.Equal(t, sr.streams["bar"], v) + + v, exists = sr.Get("baz") + assert.True(t, exists) + assert.Equal(t, sr.streams["baz"], v) + + v, exists = sr.Get("qux") + assert.Nil(t, v) + assert.False(t, exists) + }) + t.Run("Register", func(t *testing.T) { + sr := newStreamRegistry(lggr, runner) + + t.Run("registers new stream", func(t *testing.T) { + assert.Len(t, sr.streams, 0) + err := sr.Register("foo", pipeline.Spec{ID: 32, DotDagSource: "source"}, nil) + require.NoError(t, err) + assert.Len(t, sr.streams, 1) + + v, exists := sr.Get("foo") + require.True(t, exists) + strm := v.(*stream) + assert.Equal(t, StreamID("foo"), strm.id) + assert.Equal(t, int32(32), strm.spec.ID) + }) + + t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) { + assert.Len(t, sr.streams, 1) + err := sr.Register("foo", pipeline.Spec{ID: 33, DotDagSource: "source"}, nil) + require.Error(t, err) + assert.Len(t, sr.streams, 1) + assert.EqualError(t, err, "stream already registered for id: \"foo\"") + + v, exists := sr.Get("foo") + require.True(t, exists) + strm := v.(*stream) + assert.Equal(t, StreamID("foo"), strm.id) + assert.Equal(t, int32(32), strm.spec.ID) + }) + }) + t.Run("Unregister", func(t *testing.T) { + sr := newStreamRegistry(lggr, runner) + + sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} + sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} + sr.streams["baz"] = &mockStream{run: &pipeline.Run{ID: 3}} + + t.Run("unregisters a stream", func(t *testing.T) { + assert.Len(t, sr.streams, 3) + + sr.Unregister("foo") + + assert.Len(t, sr.streams, 2) + _, exists := sr.streams["foo"] + assert.False(t, exists) + }) + t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) { + assert.Len(t, sr.streams, 2) + + sr.Unregister("foo") + + assert.Len(t, sr.streams, 2) + _, exists := sr.streams["foo"] + assert.False(t, exists) + }) + }) +} diff --git a/core/services/streams/stream_test.go b/core/services/streams/stream_test.go new file mode 100644 index 00000000000..3a556411bc6 --- /dev/null +++ b/core/services/streams/stream_test.go @@ -0,0 +1,133 @@ +package streams + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" +) + +var UUID = uuid.New() + +type mockRunner struct { + p *pipeline.Pipeline + run *pipeline.Run + trrs pipeline.TaskRunResults + err error +} + +func (m *mockRunner) ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) { + return m.run, m.trrs, m.err +} +func (m *mockRunner) InitializePipeline(spec pipeline.Spec) (p *pipeline.Pipeline, err error) { + return m.p, m.err +} +func (m *mockRunner) InsertFinishedRun(run *pipeline.Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error { + return m.err +} + +type MockTask struct { + result pipeline.Result +} + +func (m *MockTask) Type() pipeline.TaskType { return "MockTask" } +func (m *MockTask) ID() int { return 0 } +func (m *MockTask) DotID() string { return "" } +func (m *MockTask) Run(ctx context.Context, lggr logger.Logger, vars pipeline.Vars, inputs []pipeline.Result) (pipeline.Result, pipeline.RunInfo) { + return m.result, pipeline.RunInfo{} +} +func (m *MockTask) Base() *pipeline.BaseTask { return nil } +func (m *MockTask) Outputs() []pipeline.Task { return nil } +func (m *MockTask) Inputs() []pipeline.TaskDependency { return nil } +func (m *MockTask) OutputIndex() int32 { return 0 } +func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false } +func (m *MockTask) TaskRetries() uint32 { return 0 } +func (m *MockTask) TaskMinBackoff() time.Duration { return 0 } +func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 } + +func Test_Stream(t *testing.T) { + lggr := logger.TestLogger(t) + runner := &mockRunner{} + spec := pipeline.Spec{} + id := StreamID("stream-id-foo") + ctx := testutils.Context(t) + + t.Run("Run", func(t *testing.T) { + strm := newStream(lggr, id, spec, runner, nil) + + t.Run("errors with empty pipeline", func(t *testing.T) { + _, _, err := strm.Run(ctx) + assert.EqualError(t, err, "Run failed: Run failed due to unparseable pipeline: empty pipeline") + }) + + spec.DotDagSource = ` +succeed [type=memo value=42] +succeed; +` + + strm = newStream(lggr, id, spec, runner, nil) + + t.Run("executes the pipeline (success)", func(t *testing.T) { + runner.run = &pipeline.Run{ID: 42} + runner.trrs = []pipeline.TaskRunResult{pipeline.TaskRunResult{ID: UUID}} + runner.err = nil + + run, trrs, err := strm.Run(ctx) + assert.NoError(t, err) + + assert.Equal(t, int64(42), run.ID) + require.Len(t, trrs, 1) + assert.Equal(t, UUID, trrs[0].ID) + }) + t.Run("executes the pipeline (failure)", func(t *testing.T) { + runner.err = errors.New("something exploded") + + _, _, err := strm.Run(ctx) + require.Error(t, err) + + assert.EqualError(t, err, "Run failed: error executing run for spec ID 0: something exploded") + }) + }) +} + +func Test_ExtractBigInt(t *testing.T) { + t.Run("wrong number of inputs", func(t *testing.T) { + trrs := []pipeline.TaskRunResult{} + + _, err := ExtractBigInt(trrs) + assert.EqualError(t, err, "invalid number of results, expected: 1, got: 0") + }) + t.Run("wrong type", func(t *testing.T) { + trrs := []pipeline.TaskRunResult{ + { + Result: pipeline.Result{Value: []byte{1, 2, 3}}, + Task: &MockTask{}, + }, + } + + _, err := ExtractBigInt(trrs) + assert.EqualError(t, err, "failed to parse BenchmarkPrice: type []uint8 cannot be converted to decimal.Decimal ([1 2 3])") + }) + t.Run("correct inputs", func(t *testing.T) { + trrs := []pipeline.TaskRunResult{ + { + Result: pipeline.Result{Value: "122.345"}, + Task: &MockTask{}, + }, + } + + val, err := ExtractBigInt(trrs) + require.NoError(t, err) + assert.Equal(t, big.NewInt(122), val) + }) +} diff --git a/core/services/vrf/delegate.go b/core/services/vrf/delegate.go index ba28e83bf3f..ecabbc09c71 100644 --- a/core/services/vrf/delegate.go +++ b/core/services/vrf/delegate.go @@ -76,7 +76,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { if jb.VRFSpec == nil || jb.PipelineSpec == nil { return nil, errors.Errorf("vrf.Delegate expects a VRFSpec and PipelineSpec to be present, got %+v", jb) } - pl, err := jb.PipelineSpec.Pipeline() + pl, err := jb.PipelineSpec.ParsePipeline() if err != nil { return nil, err } diff --git a/core/store/migrate/migrations/0216_stream_specs.sql b/core/store/migrate/migrations/0216_stream_specs.sql new file mode 100644 index 00000000000..f446928702c --- /dev/null +++ b/core/store/migrate/migrations/0216_stream_specs.sql @@ -0,0 +1,40 @@ +-- +goose Up +ALTER TABLE + jobs +DROP + CONSTRAINT chk_only_one_spec, +ADD + CONSTRAINT chk_specs CHECK ( + num_nonnulls( + ocr_oracle_spec_id, ocr2_oracle_spec_id, + direct_request_spec_id, flux_monitor_spec_id, + keeper_spec_id, cron_spec_id, webhook_spec_id, + vrf_spec_id, blockhash_store_spec_id, + block_header_feeder_spec_id, bootstrap_spec_id, + gateway_spec_id, + legacy_gas_station_server_spec_id, + legacy_gas_station_sidecar_spec_id, + eal_spec_id, + CASE "type" WHEN 'stream' THEN 1 ELSE NULL END -- 'stream' type lacks a spec but should not cause validation to fail + ) = 1 + ); + +-- +goose Down +ALTER TABLE + jobs +DROP + CONSTRAINT chk_specs, +ADD + CONSTRAINT chk_only_one_spec CHECK ( + num_nonnulls( + ocr_oracle_spec_id, ocr2_oracle_spec_id, + direct_request_spec_id, flux_monitor_spec_id, + keeper_spec_id, cron_spec_id, webhook_spec_id, + vrf_spec_id, blockhash_store_spec_id, + block_header_feeder_spec_id, bootstrap_spec_id, + gateway_spec_id, + legacy_gas_station_server_spec_id, + legacy_gas_station_sidecar_spec_id, + eal_spec_id + ) = 1 + ); diff --git a/core/testdata/testspecs/v2_specs.go b/core/testdata/testspecs/v2_specs.go index f2a40ff332a..e66971a7a11 100644 --- a/core/testdata/testspecs/v2_specs.go +++ b/core/testdata/testspecs/v2_specs.go @@ -828,3 +828,34 @@ storeBlockhashesBatchSize = %d return BlockHeaderFeederSpec{BlockHeaderFeederSpecParams: params, toml: toml} } + +type StreamSpecParams struct { + Name string +} + +type StreamSpec struct { + StreamSpecParams + toml string +} + +// Toml returns the BlockhashStoreSpec in TOML string form. +func (b StreamSpec) Toml() string { + return b.toml +} + +func GenerateStreamSpec(params StreamSpecParams) StreamSpec { + template := ` +type = "stream" +schemaVersion = 1 +name = "%s" +observationSource = """ +ds [type=http method=GET url="https://chain.link/ETH-USD"]; +ds_parse [type=jsonparse path="data,price"]; +ds_multiply [type=multiply times=100]; +ds -> ds_parse -> ds_multiply; +""" +` + + toml := fmt.Sprintf(template, params.Name) + return StreamSpec{StreamSpecParams: params, toml: toml} +} diff --git a/core/web/jobs_controller.go b/core/web/jobs_controller.go index 0f97e0b53d3..4e11f68097d 100644 --- a/core/web/jobs_controller.go +++ b/core/web/jobs_controller.go @@ -27,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate" "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" "github.com/smartcontractkit/chainlink/v2/core/services/pg" + "github.com/smartcontractkit/chainlink/v2/core/services/streams" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/web/presenters" @@ -250,6 +251,8 @@ func (jc *JobsController) validateJobSpec(tomlString string) (jb job.Job, status jb, err = ocrbootstrap.ValidatedBootstrapSpecToml(tomlString) case job.Gateway: jb, err = gateway.ValidatedGatewaySpec(tomlString) + case job.Stream: + jb, err = streams.ValidatedStreamSpec(tomlString) default: return jb, http.StatusUnprocessableEntity, errors.Errorf("unknown job type: %s", jobType) } diff --git a/core/web/jobs_controller_test.go b/core/web/jobs_controller_test.go index 0a40c8a9c71..472a5aa8fd8 100644 --- a/core/web/jobs_controller_test.go +++ b/core/web/jobs_controller_test.go @@ -360,6 +360,26 @@ func TestJobController_Create_HappyPath(t *testing.T) { assert.Equal(t, jb.VRFSpec.CoordinatorAddress.Hex(), resource.VRFSpec.CoordinatorAddress.Hex()) }, }, + { + name: "stream", + tomlTemplate: func(_ string) string { + return testspecs.GenerateStreamSpec(testspecs.StreamSpecParams{Name: "ETH/USD"}).Toml() + }, + assertion: func(t *testing.T, nameAndExternalJobID string, r *http.Response) { + require.Equal(t, http.StatusOK, r.StatusCode) + resp := cltest.ParseResponseBody(t, r) + resource := presenters.JobResource{} + err = web.ParseJSONAPIResponse(resp, &resource) + require.NoError(t, err) + + jb, err := jorm.FindJob(testutils.Context(t), mustInt32FromString(t, resource.ID)) + require.NoError(t, err) + require.NotNil(t, jb.PipelineSpec) + + assert.NotNil(t, resource.PipelineSpec.DotDAGSource) + assert.Equal(t, jb.Name.ValueOrZero(), resource.Name) + }, + }, } for _, tc := range tt { c := tc diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2ba80927256..6b70fd51c63 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `chainlink health` CLI command and HTML `/health` endpoint, to provide human-readable views of the underlying JSON health data. +- New job type `stream` to represent streamspecs. This job type is not yet used anywhere but will be required for Data Streams V1. ### Fixed From fe899c674f46e014ad030e999eb0357d748cc686 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Tue, 9 Jan 2024 12:29:13 -0500 Subject: [PATCH 2/5] Fix linter --- core/services/streams/stream_registry_test.go | 2 +- core/web/jobs_controller_test.go | 20 ++++++++++++------- core/web/presenters/job.go | 2 ++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 6b28ea41722..08298a66ce0 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -6,9 +6,9 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" - "github.com/stretchr/testify/require" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type mockStream struct { diff --git a/core/web/jobs_controller_test.go b/core/web/jobs_controller_test.go index 472a5aa8fd8..83c4fc30db0 100644 --- a/core/web/jobs_controller_test.go +++ b/core/web/jobs_controller_test.go @@ -36,6 +36,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/vrfkey" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/tomlutils" @@ -139,12 +140,17 @@ func TestJobController_Create_HappyPath(t *testing.T) { app, client := setupJobsControllerTests(t) b1, b2 := setupBridges(t, app.GetSqlxDB(), app.GetConfig().Database()) require.NoError(t, app.KeyStore.OCR().Add(cltest.DefaultOCRKey)) - pks, err := app.KeyStore.VRF().GetAll() - require.NoError(t, err) - require.Len(t, pks, 1) - k, err := app.KeyStore.P2P().GetAll() - require.NoError(t, err) - require.Len(t, k, 1) + var pks []vrfkey.KeyV2 + var k []p2pkey.KeyV2 + { + var err error + pks, err = app.KeyStore.VRF().GetAll() + require.NoError(t, err) + require.Len(t, pks, 1) + k, err = app.KeyStore.P2P().GetAll() + require.NoError(t, err) + require.Len(t, k, 1) + } jorm := app.JobORM() var tt = []struct { @@ -369,7 +375,7 @@ func TestJobController_Create_HappyPath(t *testing.T) { require.Equal(t, http.StatusOK, r.StatusCode) resp := cltest.ParseResponseBody(t, r) resource := presenters.JobResource{} - err = web.ParseJSONAPIResponse(resp, &resource) + err := web.ParseJSONAPIResponse(resp, &resource) require.NoError(t, err) jb, err := jorm.FindJob(testutils.Context(t), mustInt32FromString(t, resource.ID)) diff --git a/core/web/presenters/job.go b/core/web/presenters/job.go index a7aed0e5552..d0a6cfb5ca9 100644 --- a/core/web/presenters/job.go +++ b/core/web/presenters/job.go @@ -513,6 +513,8 @@ func NewJobResource(j job.Job) *JobResource { resource.BootstrapSpec = NewBootstrapSpec(j.BootstrapSpec) case job.Gateway: resource.GatewaySpec = NewGatewaySpec(j.GatewaySpec) + case job.Stream: + // no spec; nothing to do case job.LegacyGasStationServer, job.LegacyGasStationSidecar: // unsupported } From 40173fbc1b813bd5cc4465faa049714b88da5f60 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 11 Jan 2024 10:21:57 -0500 Subject: [PATCH 3/5] Rename StreamRegistry => Registry --- core/services/chainlink/application.go | 2 +- core/services/streams/delegate.go | 6 +++--- core/services/streams/stream_registry.go | 10 +++++----- core/services/streams/stream_registry_test.go | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 5c350a8d146..a9f9c22df52 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -291,7 +291,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient) jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database()) txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database()) - streamRegistry = streams.NewStreamRegistry(globalLogger, pipelineRunner) + streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner) ) for _, chain := range legacyEVMChains.Slice() { diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index 31352ecbdc4..fac2c7a36b3 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -23,14 +23,14 @@ type DelegateConfig interface { type Delegate struct { lggr logger.Logger - registry StreamRegistry + registry Registry runner ocrcommon.Runner cfg DelegateConfig } var _ job.Delegate = (*Delegate)(nil) -func NewDelegate(lggr logger.Logger, registry StreamRegistry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { +func NewDelegate(lggr logger.Logger, registry Registry, runner ocrcommon.Runner, cfg DelegateConfig) *Delegate { return &Delegate{lggr, registry, runner, cfg} } @@ -66,7 +66,7 @@ type ResultRunSaver interface { } type StreamService struct { - registry StreamRegistry + registry Registry id StreamID spec *pipeline.Spec lggr logger.Logger diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 086a2f2c7bc..6034c5d6b4a 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -14,7 +14,7 @@ func (s StreamID) String() string { return string(s) } -type StreamRegistry interface { +type Registry interface { Get(streamID StreamID) (strm Stream, exists bool) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error Unregister(streamID StreamID) @@ -27,14 +27,14 @@ type streamRegistry struct { streams map[StreamID]Stream } -func NewStreamRegistry(lggr logger.Logger, runner Runner) StreamRegistry { - return newStreamRegistry(lggr, runner) +func NewRegistry(lggr logger.Logger, runner Runner) Registry { + return newRegistry(lggr, runner) } -func newStreamRegistry(lggr logger.Logger, runner Runner) *streamRegistry { +func newRegistry(lggr logger.Logger, runner Runner) *streamRegistry { return &streamRegistry{ sync.RWMutex{}, - lggr.Named("StreamRegistry"), + lggr.Named("Registry"), runner, make(map[StreamID]Stream), } diff --git a/core/services/streams/stream_registry_test.go b/core/services/streams/stream_registry_test.go index 08298a66ce0..2c7c2bd6ecc 100644 --- a/core/services/streams/stream_registry_test.go +++ b/core/services/streams/stream_registry_test.go @@ -21,12 +21,12 @@ func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunRe return m.run, m.trrs, m.err } -func Test_StreamRegistry(t *testing.T) { +func Test_Registry(t *testing.T) { lggr := logger.TestLogger(t) runner := &mockRunner{} t.Run("Get", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} @@ -49,7 +49,7 @@ func Test_StreamRegistry(t *testing.T) { assert.False(t, exists) }) t.Run("Register", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) t.Run("registers new stream", func(t *testing.T) { assert.Len(t, sr.streams, 0) @@ -79,7 +79,7 @@ func Test_StreamRegistry(t *testing.T) { }) }) t.Run("Unregister", func(t *testing.T) { - sr := newStreamRegistry(lggr, runner) + sr := newRegistry(lggr, runner) sr.streams["foo"] = &mockStream{run: &pipeline.Run{ID: 1}} sr.streams["bar"] = &mockStream{run: &pipeline.Run{ID: 2}} From aa24834b28cff36cceed9dced5fb78027c6a92d1 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 11 Jan 2024 10:24:47 -0500 Subject: [PATCH 4/5] rename migration --- .../migrations/{0216_stream_specs.sql => 0220_stream_specs.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/store/migrate/migrations/{0216_stream_specs.sql => 0220_stream_specs.sql} (100%) diff --git a/core/store/migrate/migrations/0216_stream_specs.sql b/core/store/migrate/migrations/0220_stream_specs.sql similarity index 100% rename from core/store/migrate/migrations/0216_stream_specs.sql rename to core/store/migrate/migrations/0220_stream_specs.sql From 6126870934cae7bd15bc2a53cae18b190a818c43 Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Thu, 11 Jan 2024 10:34:29 -0500 Subject: [PATCH 5/5] StreamID => type alias --- core/services/streams/delegate.go | 2 +- core/services/streams/stream_registry.go | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/services/streams/delegate.go b/core/services/streams/delegate.go index fac2c7a36b3..3b9b5c773ae 100644 --- a/core/services/streams/delegate.go +++ b/core/services/streams/delegate.go @@ -48,7 +48,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) (services []job.ServiceCtx, err e return nil, errors.New("job name is required to be present for stream specs") } id := StreamID(jb.Name.String) - lggr := d.lggr.Named(id.String()).With("streamID", id) + lggr := d.lggr.Named(id).With("streamID", id) rrs := ocrcommon.NewResultRunSaver(d.runner, lggr, d.cfg.MaxSuccessfulRuns(), d.cfg.ResultWriteQueueDepth()) services = append(services, rrs, &StreamService{ diff --git a/core/services/streams/stream_registry.go b/core/services/streams/stream_registry.go index 6034c5d6b4a..c79c6c4e043 100644 --- a/core/services/streams/stream_registry.go +++ b/core/services/streams/stream_registry.go @@ -8,11 +8,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" ) -type StreamID string - -func (s StreamID) String() string { - return string(s) -} +type StreamID = string type Registry interface { Get(streamID StreamID) (strm Stream, exists bool)