Skip to content

Commit

Permalink
[KS-36] Add workflow job type and hardcoded DF2.0 workflow engine (#1…
Browse files Browse the repository at this point in the history
…1974)

* Add workflow job type

* [KS-36] Add workflow delegate with hardcoded engine for DF2.0
  • Loading branch information
cedric-cordenier authored Feb 12, 2024
1 parent 1fb4203 commit 877f2f5
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 0 deletions.
7 changes: 7 additions & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/static"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand Down Expand Up @@ -57,6 +58,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth"
"github.com/smartcontractkit/chainlink/v2/core/sessions/localauth"
Expand Down Expand Up @@ -183,6 +185,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
keyStore := opts.KeyStore
restrictedHTTPClient := opts.RestrictedHTTPClient
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient
registry := capabilities.NewRegistry()

// LOOPs can be created as options, in the case of LOOP relayers, or
// as OCR2 job implementations, in the case of Median today.
Expand Down Expand Up @@ -351,6 +354,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
streamRegistry,
pipelineRunner,
cfg.JobPipeline()),
job.Workflow: workflows.NewDelegate(
globalLogger,
registry,
),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
Stream Type = (Type)(pipeline.StreamJobType)
VRF Type = (Type)(pipeline.VRFJobType)
Webhook Type = (Type)(pipeline.WebhookJobType)
Workflow Type = (Type)(pipeline.WorkflowJobType)
)

//revive:disable:redefines-builtin-id
Expand Down
2 changes: 2 additions & 0 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ func (o *orm) CreateJob(jb *Job, qopts ...pg.QOpt) error {
jb.GatewaySpecID = &specID
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
// 'workflow' type has no associated spec, nothing to do here
default:
o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type)
}
Expand Down
1 change: 1 addition & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
StreamJobType string = "stream"
VRFJobType string = "vrf"
WebhookJobType string = "webhook"
WorkflowJobType string = "workflow"
)

//go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore
Expand Down
40 changes: 40 additions & 0 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package workflows

import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type Delegate struct {
registry types.CapabilitiesRegistry
logger logger.Logger
}

var _ job.Delegate = (*Delegate)(nil)

func (d *Delegate) JobType() job.Type {
return job.Workflow
}

func (d *Delegate) BeforeJobCreated(spec job.Job) {}

func (d *Delegate) AfterJobCreated(jb job.Job) {}

func (d *Delegate) BeforeJobDeleted(spec job.Job) {}

func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error { return nil }

// ServicesForSpec satisfies the job.Delegate interface.
func (d *Delegate) ServicesForSpec(spec job.Job) ([]job.ServiceCtx, error) {
engine, err := NewEngine(d.logger, d.registry)
if err != nil {
return nil, err
}
return []job.ServiceCtx{engine}, nil
}

func NewDelegate(logger logger.Logger, registry types.CapabilitiesRegistry) *Delegate {
return &Delegate{logger: logger, registry: registry}
}
216 changes: 216 additions & 0 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package workflows

import (
"context"
"fmt"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

const (
mockedWorkflowID = "ef7c8168-f4d1-422f-a4b2-8ce0a1075f0a"
mockedTriggerID = "bd727a82-5cac-4071-be62-0152dd9adb0f"
)

type Engine struct {
services.StateMachine
logger logger.Logger
registry types.CapabilitiesRegistry
trigger capabilities.TriggerCapability
consensus capabilities.ConsensusCapability
target capabilities.TargetCapability
callbackCh chan capabilities.CapabilityResponse
cancel func()
}

func (e *Engine) Start(ctx context.Context) error {
return e.StartOnce("Engine", func() error {
err := e.registerTrigger(ctx)
if err != nil {
return err
}

// create a new context, since the one passed in via Start is short-lived.
ctx, cancel := context.WithCancel(context.Background())
e.cancel = cancel
go e.loop(ctx)
return nil
})
}

func (e *Engine) registerTrigger(ctx context.Context) error {
triggerConf, err := values.NewMap(
map[string]any{
"feedlist": []any{
// ETHUSD, LINKUSD, USDBTC
123, 456, 789,
},
},
)
if err != nil {
return err
}

triggerInputs, err := values.NewMap(
map[string]any{
"triggerId": mockedTriggerID,
},
)
if err != nil {
return err
}

triggerRegRequest := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
},
Config: triggerConf,
Inputs: triggerInputs,
}
err = e.trigger.RegisterTrigger(ctx, e.callbackCh, triggerRegRequest)
if err != nil {
return fmt.Errorf("failed to instantiate mercury_trigger, %s", err)
}
return nil
}

func (e *Engine) loop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case resp := <-e.callbackCh:
err := e.handleExecution(ctx, resp)
if err != nil {
e.logger.Error("error executing event %+v: %w", resp, err)
}
}
}
}

func (e *Engine) handleExecution(ctx context.Context, resp capabilities.CapabilityResponse) error {
results, err := e.handleConsensus(ctx, resp)
if err != nil {
return err
}

_, err = e.handleTarget(ctx, results)
return err
}

func (e *Engine) handleTarget(ctx context.Context, resp *values.List) (*values.List, error) {
report, err := resp.Unwrap()
if err != nil {
return nil, err
}
inputs := map[string]values.Value{
"report": resp,
}
config, err := values.NewMap(map[string]any{
"address": "0xaabbcc",
"method": "updateFeedValues(report bytes, role uint8)",
"params": []any{
report, 1,
},
})
if err != nil {
return nil, err
}

tr := capabilities.CapabilityRequest{
Inputs: &values.Map{Underlying: inputs},
Config: config,
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
},
}
return capabilities.ExecuteSync(ctx, e.target, tr)
}

func (e *Engine) handleConsensus(ctx context.Context, resp capabilities.CapabilityResponse) (*values.List, error) {
inputs := map[string]values.Value{
"observations": resp.Value,
}
config, err := values.NewMap(map[string]any{
"aggregation_method": "data_feeds_2_0",
"aggregation_config": map[string]any{
// ETHUSD
"123": map[string]any{
"deviation": "0.005",
"heartbeat": "24h",
},
// LINKUSD
"456": map[string]any{
"deviation": "0.001",
"heartbeat": "24h",
},
// BTCUSD
"789": map[string]any{
"deviation": "0.002",
"heartbeat": "6h",
},
},
"encoder": "EVM",
})
if err != nil {
return nil, nil
}
cr := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
},
Inputs: &values.Map{Underlying: inputs},
Config: config,
}
return capabilities.ExecuteSync(ctx, e.consensus, cr)
}

func (e *Engine) Close() error {
return e.StopOnce("Engine", func() error {
defer e.cancel()

triggerInputs, err := values.NewMap(
map[string]any{
"triggerId": mockedTriggerID,
},
)
if err != nil {
return err
}
deregRequest := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: mockedWorkflowID,
},
Inputs: triggerInputs,
}
return e.trigger.UnregisterTrigger(context.Background(), deregRequest)
})
}

func NewEngine(lggr logger.Logger, registry types.CapabilitiesRegistry) (*Engine, error) {
ctx := context.Background()
trigger, err := registry.GetTrigger(ctx, "on_mercury_report")
if err != nil {
return nil, err
}
consensus, err := registry.GetConsensus(ctx, "off-chain-reporting")
if err != nil {
return nil, err
}
target, err := registry.GetTarget(ctx, "write_polygon_mainnet")
if err != nil {
return nil, err
}
return &Engine{
logger: lggr,
registry: registry,
trigger: trigger,
consensus: consensus,
target: target,
callbackCh: make(chan capabilities.CapabilityResponse),
}, nil
}
Loading

0 comments on commit 877f2f5

Please sign in to comment.