Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement stream specs #11685

Merged
merged 5 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/promreporter"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
Expand Down Expand Up @@ -290,6 +291,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner = pipeline.NewRunner(pipelineORM, bridgeORM, cfg.JobPipeline(), cfg.WebServer(), legacyEVMChains, keyStore.Eth(), keyStore.VRF(), globalLogger, restrictedHTTPClient, unrestrictedHTTPClient)
jobORM = job.NewORM(db, pipelineORM, bridgeORM, keyStore, globalLogger, cfg.Database())
txmORM = txmgr.NewTxStore(db, globalLogger, cfg.Database())
streamRegistry = streams.NewRegistry(globalLogger, pipelineRunner)
)

for _, chain := range legacyEVMChains.Slice() {
Expand Down Expand Up @@ -344,6 +346,11 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
db,
cfg.Database(),
globalLogger),
job.Stream: streams.NewDelegate(
globalLogger,
streamRegistry,
pipelineRunner,
cfg.JobPipeline()),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down
66 changes: 35 additions & 31 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ import (
)

const (
BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType)
BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType)
Bootstrap Type = (Type)(pipeline.BootstrapJobType)
Cron Type = (Type)(pipeline.CronJobType)
DirectRequest Type = (Type)(pipeline.DirectRequestJobType)
FluxMonitor Type = (Type)(pipeline.FluxMonitorJobType)
OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType)
OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType)
Gateway Type = (Type)(pipeline.GatewayJobType)
Keeper Type = (Type)(pipeline.KeeperJobType)
VRF Type = (Type)(pipeline.VRFJobType)
BlockhashStore Type = (Type)(pipeline.BlockhashStoreJobType)
BlockHeaderFeeder Type = (Type)(pipeline.BlockHeaderFeederJobType)
LegacyGasStationServer Type = (Type)(pipeline.LegacyGasStationServerJobType)
LegacyGasStationSidecar Type = (Type)(pipeline.LegacyGasStationSidecarJobType)
OffchainReporting Type = (Type)(pipeline.OffchainReportingJobType)
OffchainReporting2 Type = (Type)(pipeline.OffchainReporting2JobType)
Stream Type = (Type)(pipeline.StreamJobType)
VRF Type = (Type)(pipeline.VRFJobType)
Webhook Type = (Type)(pipeline.WebhookJobType)
Bootstrap Type = (Type)(pipeline.BootstrapJobType)
Gateway Type = (Type)(pipeline.GatewayJobType)
)

//revive:disable:redefines-builtin-id
Expand All @@ -70,52 +71,55 @@ func (t Type) SchemaVersion() uint32 {

var (
requiresPipelineSpec = map[Type]bool{
BlockHeaderFeeder: false,
BlockhashStore: false,
Bootstrap: false,
Cron: true,
DirectRequest: true,
FluxMonitor: true,
OffchainReporting: false, // bootstrap jobs do not require it
OffchainReporting2: false, // bootstrap jobs do not require it
Gateway: false,
Keeper: false, // observationSource is injected in the upkeep executor
VRF: true,
Webhook: true,
BlockhashStore: false,
BlockHeaderFeeder: false,
LegacyGasStationServer: false,
LegacyGasStationSidecar: false,
Bootstrap: false,
Gateway: false,
OffchainReporting2: false, // bootstrap jobs do not require it
OffchainReporting: false, // bootstrap jobs do not require it
Stream: true,
VRF: true,
Webhook: true,
}
supportsAsync = map[Type]bool{
BlockHeaderFeeder: false,
BlockhashStore: false,
Bootstrap: false,
Cron: true,
DirectRequest: true,
FluxMonitor: false,
OffchainReporting: false,
OffchainReporting2: false,
Gateway: false,
Keeper: true,
VRF: true,
Webhook: true,
BlockhashStore: false,
BlockHeaderFeeder: false,
LegacyGasStationServer: false,
LegacyGasStationSidecar: false,
Bootstrap: false,
Gateway: false,
OffchainReporting2: false,
OffchainReporting: false,
Stream: true,
VRF: true,
Webhook: true,
}
schemaVersions = map[Type]uint32{
BlockHeaderFeeder: 1,
BlockhashStore: 1,
Bootstrap: 1,
Cron: 1,
DirectRequest: 1,
FluxMonitor: 1,
OffchainReporting: 1,
OffchainReporting2: 1,
Gateway: 1,
Keeper: 1,
VRF: 1,
Webhook: 1,
BlockhashStore: 1,
BlockHeaderFeeder: 1,
LegacyGasStationServer: 1,
LegacyGasStationSidecar: 1,
Bootstrap: 1,
Gateway: 1,
OffchainReporting2: 1,
OffchainReporting: 1,
Stream: 1,
VRF: 1,
Webhook: 1,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are simply sorted to make future additions easier.

}
)

Expand Down
2 changes: 2 additions & 0 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
return errors.Wrap(err, "failed to create GatewaySpec for jobSpec")
}
jb.GatewaySpecID = &specID
case Stream:
// 'stream' type has no associated spec, nothing to do here
default:
o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type)
}
Expand Down
17 changes: 9 additions & 8 deletions core/services/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,21 @@ var (
ErrInvalidJobType = errors.New("invalid job type")
ErrInvalidSchemaVersion = errors.New("invalid schema version")
jobTypes = map[Type]struct{}{
BlockHeaderFeeder: {},
BlockhashStore: {},
Bootstrap: {},
Cron: {},
DirectRequest: {},
FluxMonitor: {},
OffchainReporting: {},
OffchainReporting2: {},
Keeper: {},
VRF: {},
Webhook: {},
BlockhashStore: {},
Bootstrap: {},
BlockHeaderFeeder: {},
Gateway: {},
Keeper: {},
LegacyGasStationServer: {},
LegacyGasStationSidecar: {},
OffchainReporting2: {},
OffchainReporting: {},
Stream: {},
VRF: {},
Webhook: {},
}
)

Expand Down
9 changes: 7 additions & 2 deletions core/services/ocrcommon/run_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
)

type Runner interface {
InsertFinishedRun(run *pipeline.Run, saveSuccessfulTaskRuns bool, qopts ...pg.QOpt) error
}

type RunResultSaver struct {
services.StateMachine

maxSuccessfulRuns uint64
runResults chan *pipeline.Run
pipelineRunner pipeline.Runner
pipelineRunner Runner
done chan struct{}
logger logger.Logger
}
Expand All @@ -24,7 +29,7 @@ func (r *RunResultSaver) HealthReport() map[string]error {

func (r *RunResultSaver) Name() string { return r.logger.Name() }

func NewResultRunSaver(pipelineRunner pipeline.Runner,
func NewResultRunSaver(pipelineRunner Runner,
logger logger.Logger, maxSuccessfulRuns uint64, resultsWriteDepth uint64,
) *RunResultSaver {
return &RunResultSaver{
Expand Down
17 changes: 9 additions & 8 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ import (
)

const (
BlockHeaderFeederJobType string = "blockheaderfeeder"
BlockhashStoreJobType string = "blockhashstore"
BootstrapJobType string = "bootstrap"
CronJobType string = "cron"
DirectRequestJobType string = "directrequest"
FluxMonitorJobType string = "fluxmonitor"
OffchainReportingJobType string = "offchainreporting"
OffchainReporting2JobType string = "offchainreporting2"
KeeperJobType string = "keeper"
VRFJobType string = "vrf"
BlockhashStoreJobType string = "blockhashstore"
BlockHeaderFeederJobType string = "blockheaderfeeder"
WebhookJobType string = "webhook"
BootstrapJobType string = "bootstrap"
GatewayJobType string = "gateway"
KeeperJobType string = "keeper"
LegacyGasStationServerJobType string = "legacygasstationserver"
LegacyGasStationSidecarJobType string = "legacygasstationsidecar"
OffchainReporting2JobType string = "offchainreporting2"
OffchainReportingJobType string = "offchainreporting"
StreamJobType string = "stream"
VRFJobType string = "vrf"
WebhookJobType string = "webhook"
)

//go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore
Expand Down
11 changes: 10 additions & 1 deletion core/services/pipeline/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,18 @@ type Spec struct {
JobID int32 `json:"-"`
JobName string `json:"-"`
JobType string `json:"-"`

Pipeline *Pipeline `json:"-" db:"-"` // This may be nil, or may be populated manually as a cache. There is no locking on this, so be careful
}

func (s *Spec) GetOrParsePipeline() (*Pipeline, error) {
if s.Pipeline != nil {
return s.Pipeline, nil
}
return s.ParsePipeline()
}

func (s Spec) Pipeline() (*Pipeline, error) {
func (s *Spec) ParsePipeline() (*Pipeline, error) {
return Parse(s.DotDagSource)
}

Expand Down
62 changes: 34 additions & 28 deletions core/services/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,32 @@ func (r *runner) ExecuteRun(
})
defer cancel()

run := NewRun(spec, vars)

pipeline, err := r.initializePipeline(run)
if err != nil {
return run, nil, err
var pipeline *Pipeline
if spec.Pipeline != nil {
// assume if set that it has been pre-initialized
pipeline = spec.Pipeline
} else {
var err error
pipeline, err = r.InitializePipeline(spec)
if err != nil {
return nil, nil, err
}
}

run := NewRun(spec, vars)
taskRunResults := r.run(ctx, pipeline, run, vars, l)

if run.Pending {
return run, nil, pkgerrors.Wrapf(err, "unexpected async run for spec ID %v, tried executing via ExecuteAndInsertFinishedRun", spec.ID)
return run, nil, fmt.Errorf("unexpected async run for spec ID %v, tried executing via ExecuteRun", spec.ID)
}

return run, taskRunResults, nil
}

func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
pipeline, err := Parse(run.PipelineSpec.DotDagSource)
func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) {
pipeline, err = spec.GetOrParsePipeline()
if err != nil {
return nil, err
return
}

// initialize certain task params
Expand All @@ -278,16 +284,16 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
task.(*BridgeTask).config = r.config
task.(*BridgeTask).bridgeConfig = r.bridgeConfig
task.(*BridgeTask).orm = r.btORM
task.(*BridgeTask).specId = run.PipelineSpec.ID
task.(*BridgeTask).specId = spec.ID
// URL is "safe" because it comes from the node's own database. We
// must use the unrestrictedHTTPClient because some node operators
// may run external adapters on their own hardware
task.(*BridgeTask).httpClient = r.unrestrictedHTTPClient
case TaskTypeETHCall:
task.(*ETHCallTask).legacyChains = r.legacyEVMChains
task.(*ETHCallTask).config = r.config
task.(*ETHCallTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*ETHCallTask).jobType = run.PipelineSpec.JobType
task.(*ETHCallTask).specGasLimit = spec.GasLimit
task.(*ETHCallTask).jobType = spec.JobType
case TaskTypeVRF:
task.(*VRFTask).keyStore = r.vrfKeyStore
case TaskTypeVRFV2:
Expand All @@ -296,28 +302,18 @@ func (r *runner) initializePipeline(run *Run) (*Pipeline, error) {
task.(*VRFTaskV2Plus).keyStore = r.vrfKeyStore
case TaskTypeEstimateGasLimit:
task.(*EstimateGasLimitTask).legacyChains = r.legacyEVMChains
task.(*EstimateGasLimitTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*EstimateGasLimitTask).jobType = run.PipelineSpec.JobType
task.(*EstimateGasLimitTask).specGasLimit = spec.GasLimit
task.(*EstimateGasLimitTask).jobType = spec.JobType
case TaskTypeETHTx:
task.(*ETHTxTask).keyStore = r.ethKeyStore
task.(*ETHTxTask).legacyChains = r.legacyEVMChains
task.(*ETHTxTask).specGasLimit = run.PipelineSpec.GasLimit
task.(*ETHTxTask).jobType = run.PipelineSpec.JobType
task.(*ETHTxTask).forwardingAllowed = run.PipelineSpec.ForwardingAllowed
task.(*ETHTxTask).specGasLimit = spec.GasLimit
task.(*ETHTxTask).jobType = spec.JobType
task.(*ETHTxTask).forwardingAllowed = spec.ForwardingAllowed
default:
}
}

// retain old UUID values
for _, taskRun := range run.PipelineTaskRuns {
task := pipeline.ByDotID(taskRun.DotID)
if task != nil && task.Base() != nil {
task.Base().uuid = taskRun.ID
} else {
return nil, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID)
}
}

return pipeline, nil
}

Expand Down Expand Up @@ -542,11 +538,21 @@ func (r *runner) ExecuteAndInsertFinishedRun(ctx context.Context, spec Spec, var
}

func (r *runner) Run(ctx context.Context, run *Run, l logger.Logger, saveSuccessfulTaskRuns bool, fn func(tx pg.Queryer) error) (incomplete bool, err error) {
pipeline, err := r.initializePipeline(run)
pipeline, err := r.InitializePipeline(run.PipelineSpec)
if err != nil {
return false, err
}

// retain old UUID values
for _, taskRun := range run.PipelineTaskRuns {
task := pipeline.ByDotID(taskRun.DotID)
if task != nil && task.Base() != nil {
task.Base().uuid = taskRun.ID
} else {
return false, pkgerrors.Errorf("failed to match a pipeline task for dot ID: %v", taskRun.DotID)
}
}

preinsert := pipeline.RequiresPreInsert()

q := r.orm.GetQ().WithOpts(pg.WithParentCtx(ctx))
Expand Down
Loading
Loading