From ec1479e07de09340239a8ab1b7a6a80dce22d31f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20Kar=C5=BEinauskas?= Date: Fri, 2 Feb 2024 20:19:10 +0200 Subject: [PATCH] KS-15 Add a capabilities library to the core node (#11811) * start working on capability.go * Combine CapabilityInfoProvider and CapabilityInfo; Add validation tests * Validate the capability type when adding it * Replace Stringer with string for simplicity * Remove SynchronousCapability interface in favour of using async * Add comments to public functions and variables * Add ExecuteSync method. Provide more info on the Executable interface. * Add a Test_ExecuteSyncReturnSingleValue * Review comments * Add CapabilityResponse struct for Execute channel * Undo chainlink-common change * Add reasoning behind code * Add missing tests * Create CallbackExecutable. Trim interfaces. * Start on an example on demand capability * Add different capability types to registry * Complete example trigger * Fix race * Add TODOs for defaultExecuteTimeout/idMaxLength * Move capability info to a var; add must handle * Remove tmockCapability * Udpate broken comment * Add some tests * RegisterUnregisterWorkflow interface * cmock => mock * Create a const for test var * Use testutils.Context(t) * Do not unwrap single value from a list * Channel naming * MustCapabilityInfo => MustNewCapabilityInfo * Tidy more * examples => triggers && downstream --------- Co-authored-by: Cedric Cordenier --- core/capabilities/capability.go | 259 ++++++++++++++++++ core/capabilities/capability_test.go | 184 +++++++++++++ core/capabilities/registry.go | 149 ++++++++++ core/capabilities/registry_test.go | 182 ++++++++++++ .../triggers/on_demand_trigger.go | 75 +++++ .../triggers/on_demand_trigger_test.go | 81 ++++++ go.mod | 2 +- 7 files changed, 931 insertions(+), 1 deletion(-) create mode 100644 core/capabilities/capability.go create mode 100644 core/capabilities/capability_test.go create mode 100644 core/capabilities/registry.go create mode 100644 core/capabilities/registry_test.go create mode 100644 core/capabilities/triggers/on_demand_trigger.go create mode 100644 core/capabilities/triggers/on_demand_trigger_test.go diff --git a/core/capabilities/capability.go b/core/capabilities/capability.go new file mode 100644 index 00000000000..84ef243c271 --- /dev/null +++ b/core/capabilities/capability.go @@ -0,0 +1,259 @@ +package capabilities + +import ( + "context" + "errors" + "fmt" + "regexp" + "time" + + "golang.org/x/mod/semver" + + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +// CapabilityType is an enum for the type of capability. +type CapabilityType int + +// CapabilityType enum values. +const ( + CapabilityTypeTrigger CapabilityType = iota + CapabilityTypeAction + CapabilityTypeConsensus + CapabilityTypeTarget +) + +// String returns a string representation of CapabilityType +func (c CapabilityType) String() string { + switch c { + case CapabilityTypeTrigger: + return "trigger" + case CapabilityTypeAction: + return "action" + case CapabilityTypeConsensus: + return "report" + case CapabilityTypeTarget: + return "target" + } + + // Panic as this should be unreachable. + panic("unknown capability type") +} + +// IsValid checks if the capability type is valid. +func (c CapabilityType) IsValid() error { + switch c { + case CapabilityTypeTrigger, + CapabilityTypeAction, + CapabilityTypeConsensus, + CapabilityTypeTarget: + return nil + } + + return fmt.Errorf("invalid capability type: %s", c) +} + +// CapabilityResponse is a struct for the Execute response of a capability. +type CapabilityResponse struct { + Value values.Value + Err error +} + +type RequestMetadata struct { + WorkflowID string + WorkflowExecutionID string +} + +type RegistrationMetadata struct { + WorkflowID string +} + +// CapabilityRequest is a struct for the Execute request of a capability. +type CapabilityRequest struct { + Metadata RequestMetadata + Config *values.Map + Inputs *values.Map +} + +type RegisterToWorkflowRequest struct { + Metadata RegistrationMetadata + Config *values.Map +} + +type UnregisterFromWorkflowRequest struct { + Metadata RegistrationMetadata + Config *values.Map +} + +// CallbackExecutable is an interface for executing a capability. +type CallbackExecutable interface { + RegisterToWorkflow(ctx context.Context, request RegisterToWorkflowRequest) error + UnregisterFromWorkflow(ctx context.Context, request UnregisterFromWorkflowRequest) error + // Capability must respect context.Done and cleanup any request specific resources + // when the context is cancelled. When a request has been completed the capability + // is also expected to close the callback channel. + // Request specific configuration is passed in via the request parameter. + // A successful response must always return a value. An error is assumed otherwise. + // The intent is to make the API explicit. + Execute(ctx context.Context, callback chan CapabilityResponse, request CapabilityRequest) error +} + +// BaseCapability interface needs to be implemented by all capability types. +// Capability interfaces are intentionally duplicated to allow for an easy change +// or extension in the future. +type BaseCapability interface { + Info() CapabilityInfo +} + +// TriggerCapability interface needs to be implemented by all trigger capabilities. +type TriggerCapability interface { + BaseCapability + RegisterTrigger(ctx context.Context, callback chan CapabilityResponse, request CapabilityRequest) error + UnregisterTrigger(ctx context.Context, request CapabilityRequest) error +} + +// ActionCapability interface needs to be implemented by all action capabilities. +type ActionCapability interface { + BaseCapability + CallbackExecutable +} + +// ConsensusCapability interface needs to be implemented by all consensus capabilities. +type ConsensusCapability interface { + BaseCapability + CallbackExecutable +} + +// TargetsCapability interface needs to be implemented by all target capabilities. +type TargetCapability interface { + BaseCapability + CallbackExecutable +} + +// CapabilityInfo is a struct for the info of a capability. +type CapabilityInfo struct { + ID string + CapabilityType CapabilityType + Description string + Version string +} + +// Info returns the info of the capability. +func (c CapabilityInfo) Info() CapabilityInfo { + return c +} + +var idRegex = regexp.MustCompile(`[a-z0-9_\-:]`) + +const ( + // TODO: this length was largely picked arbitrarily. + // Consider what a realistic/desirable value should be. + // See: https://smartcontract-it.atlassian.net/jira/software/c/projects/KS/boards/182 + idMaxLength = 128 +) + +// NewCapabilityInfo returns a new CapabilityInfo. +func NewCapabilityInfo( + id string, + capabilityType CapabilityType, + description string, + version string, +) (CapabilityInfo, error) { + if len(id) > idMaxLength { + return CapabilityInfo{}, fmt.Errorf("invalid id: %s exceeds max length %d", id, idMaxLength) + } + if !idRegex.MatchString(id) { + return CapabilityInfo{}, fmt.Errorf("invalid id: %s. Allowed: %s", id, idRegex) + } + + if ok := semver.IsValid(version); !ok { + return CapabilityInfo{}, fmt.Errorf("invalid version: %+v", version) + } + + if err := capabilityType.IsValid(); err != nil { + return CapabilityInfo{}, err + } + + return CapabilityInfo{ + ID: id, + CapabilityType: capabilityType, + Description: description, + Version: version, + }, nil +} + +// MustNewCapabilityInfo returns a new CapabilityInfo, +// panicking if we could not instantiate a CapabilityInfo. +func MustNewCapabilityInfo( + id string, + capabilityType CapabilityType, + description string, + version string, +) CapabilityInfo { + c, err := NewCapabilityInfo(id, capabilityType, description, version) + if err != nil { + panic(err) + } + + return c +} + +// TODO: this timeout was largely picked arbitrarily. +// Consider what a realistic/desirable value should be. +// See: https://smartcontract-it.atlassian.net/jira/software/c/projects/KS/boards/182 +var defaultExecuteTimeout = 10 * time.Second + +// ExecuteSync executes a capability synchronously. +// We are not handling a case where a capability panics and crashes. +// There is default timeout of 10 seconds. If a capability takes longer than +// that then it should be executed asynchronously. +func ExecuteSync(ctx context.Context, c CallbackExecutable, request CapabilityRequest) (*values.List, error) { + ctxWithT, cancel := context.WithTimeout(ctx, defaultExecuteTimeout) + defer cancel() + + responseCh := make(chan CapabilityResponse) + setupCh := make(chan error) + + go func(innerCtx context.Context, innerC CallbackExecutable, innerReq CapabilityRequest, innerCallback chan CapabilityResponse, errCh chan error) { + setupErr := innerC.Execute(innerCtx, innerCallback, innerReq) + setupCh <- setupErr + }(ctxWithT, c, request, responseCh, setupCh) + + vs := make([]values.Value, 0) +outerLoop: + for { + select { + case response, isOpen := <-responseCh: + if !isOpen { + break outerLoop + } + // An error means execution has been interrupted. + // We'll return the value discarding values received + // until now. + if response.Err != nil { + return nil, response.Err + } + + vs = append(vs, response.Value) + + // Timeout when a capability panics, crashes, and does not close the channel. + case <-ctxWithT.Done(): + return nil, fmt.Errorf("context timed out. If you did not set a timeout, be aware that the default ExecuteSync timeout is %f seconds", defaultExecuteTimeout.Seconds()) + } + } + + setupErr := <-setupCh + + // Something went wrong when setting up a capability. + if setupErr != nil { + return nil, setupErr + } + + // If the capability did not return any values, we deem it as an error. + // The intent is for the API to be explicit. + if len(vs) == 0 { + return nil, errors.New("capability did not return any values") + } + + return &values.List{Underlying: vs}, nil +} diff --git a/core/capabilities/capability_test.go b/core/capabilities/capability_test.go new file mode 100644 index 00000000000..2414e87bba9 --- /dev/null +++ b/core/capabilities/capability_test.go @@ -0,0 +1,184 @@ +package capabilities + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +func Test_CapabilityInfo(t *testing.T) { + ci, err := NewCapabilityInfo( + "capability-id", + CapabilityTypeAction, + "This is a mock capability that doesn't do anything.", + "v1.0.0", + ) + require.NoError(t, err) + + assert.Equal(t, ci, ci.Info()) +} + +func Test_CapabilityInfo_Invalid(t *testing.T) { + _, err := NewCapabilityInfo( + "capability-id", + CapabilityType(5), + "This is a mock capability that doesn't do anything.", + "v1.0.0", + ) + assert.ErrorContains(t, err, "invalid capability type") + + _, err = NewCapabilityInfo( + "&!!!", + CapabilityTypeAction, + "This is a mock capability that doesn't do anything.", + "v1.0.0", + ) + assert.ErrorContains(t, err, "invalid id") + + _, err = NewCapabilityInfo( + "mock-capability", + CapabilityTypeAction, + "This is a mock capability that doesn't do anything.", + "hello", + ) + assert.ErrorContains(t, err, "invalid version") + + _, err = NewCapabilityInfo( + strings.Repeat("n", 256), + CapabilityTypeAction, + "This is a mock capability that doesn't do anything.", + "hello", + ) + assert.ErrorContains(t, err, "exceeds max length 128") +} + +type mockCapabilityWithExecute struct { + CallbackExecutable + CapabilityInfo + ExecuteFn func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error +} + +func (m *mockCapabilityWithExecute) Execute(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + return m.ExecuteFn(ctx, callback, req) +} + +func Test_ExecuteSyncReturnSingleValue(t *testing.T) { + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + val, _ := values.NewString("hello") + callback <- CapabilityResponse{val, nil} + + close(callback) + + return nil + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(testutils.Context(t), mcwe, req) + + assert.NoError(t, err, val) + assert.Equal(t, "hello", val.Underlying[0].(*values.String).Underlying) +} + +func Test_ExecuteSyncReturnMultipleValues(t *testing.T) { + es, _ := values.NewString("hello") + expectedList := []values.Value{es, es, es} + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + callback <- CapabilityResponse{es, nil} + callback <- CapabilityResponse{es, nil} + callback <- CapabilityResponse{es, nil} + + close(callback) + + return nil + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(testutils.Context(t), mcwe, req) + + assert.NoError(t, err, val) + assert.ElementsMatch(t, expectedList, val.Underlying) +} + +func Test_ExecuteSyncCapabilitySetupErrors(t *testing.T) { + expectedErr := errors.New("something went wrong during setup") + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + close(callback) + return expectedErr + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(testutils.Context(t), mcwe, req) + + assert.ErrorContains(t, err, expectedErr.Error()) + assert.Nil(t, val) +} + +func Test_ExecuteSyncTimeout(t *testing.T) { + ctxWithTimeout := testutils.Context(t) + ctxWithTimeout, cancel := context.WithCancel(ctxWithTimeout) + cancel() + + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + return nil + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(ctxWithTimeout, mcwe, req) + + assert.ErrorContains(t, err, "context timed out. If you did not set a timeout, be aware that the default ExecuteSync timeout is") + assert.Nil(t, val) +} + +func Test_ExecuteSyncCapabilityErrors(t *testing.T) { + expectedErr := errors.New("something went wrong during execution") + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + callback <- CapabilityResponse{nil, expectedErr} + + close(callback) + + return nil + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(testutils.Context(t), mcwe, req) + + assert.ErrorContains(t, err, expectedErr.Error()) + assert.Nil(t, val) +} + +func Test_ExecuteSyncDoesNotReturnValues(t *testing.T) { + mcwe := &mockCapabilityWithExecute{ + ExecuteFn: func(ctx context.Context, callback chan CapabilityResponse, req CapabilityRequest) error { + close(callback) + return nil + }, + } + req := CapabilityRequest{} + val, err := ExecuteSync(testutils.Context(t), mcwe, req) + + assert.ErrorContains(t, err, "capability did not return any values") + assert.Nil(t, val) +} + +func Test_MustNewCapabilityInfo(t *testing.T) { + assert.Panics(t, func() { + MustNewCapabilityInfo( + "capability-id", + CapabilityTypeAction, + "This is a mock capability that doesn't do anything.", + "should-panic", + ) + }) +} diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go new file mode 100644 index 00000000000..1632b17c4b9 --- /dev/null +++ b/core/capabilities/registry.go @@ -0,0 +1,149 @@ +package capabilities + +import ( + "context" + "fmt" + "sync" +) + +// Registry is a struct for the registry of capabilities. +// Registry is safe for concurrent use. +type Registry struct { + m map[string]BaseCapability + mu sync.RWMutex +} + +// Get gets a capability from the registry. +func (r *Registry) Get(_ context.Context, id string) (BaseCapability, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + c, ok := r.m[id] + if !ok { + return nil, fmt.Errorf("capability not found with id %s", id) + } + + return c, nil +} + +// GetTrigger gets a capability from the registry and tries to coerce it to the TriggerCapability interface. +func (r *Registry) GetTrigger(ctx context.Context, id string) (TriggerCapability, error) { + c, err := r.Get(ctx, id) + if err != nil { + return nil, err + } + + tc, ok := c.(TriggerCapability) + if !ok { + return nil, fmt.Errorf("capability with id: %s does not satisfy the capability interface", id) + } + + return tc, nil +} + +// GetAction gets a capability from the registry and tries to coerce it to the ActionCapability interface. +func (r *Registry) GetAction(ctx context.Context, id string) (ActionCapability, error) { + c, err := r.Get(ctx, id) + if err != nil { + return nil, err + } + + ac, ok := c.(ActionCapability) + if !ok { + return nil, fmt.Errorf("capability with id: %s does not satisfy the capability interface", id) + } + + return ac, nil +} + +// GetConsensus gets a capability from the registry and tries to coerce it to the ActionCapability interface. +func (r *Registry) GetConsensus(ctx context.Context, id string) (ConsensusCapability, error) { + c, err := r.Get(ctx, id) + if err != nil { + return nil, err + } + + cc, ok := c.(ConsensusCapability) + if !ok { + return nil, fmt.Errorf("capability with id: %s does not satisfy the capability interface", id) + } + + return cc, nil +} + +// GetTarget gets a capability from the registry and tries to coerce it to the ActionCapability interface. +func (r *Registry) GetTarget(ctx context.Context, id string) (TargetCapability, error) { + c, err := r.Get(ctx, id) + if err != nil { + return nil, err + } + + tc, ok := c.(TargetCapability) + if !ok { + return nil, fmt.Errorf("capability with id: %s does not satisfy the capability interface", id) + } + + return tc, nil +} + +// List lists all the capabilities in the registry. +func (r *Registry) List(_ context.Context) []BaseCapability { + r.mu.RLock() + defer r.mu.RUnlock() + cl := []BaseCapability{} + for _, v := range r.m { + cl = append(cl, v) + } + + return cl +} + +// Add adds a capability to the registry. +func (r *Registry) Add(_ context.Context, c BaseCapability) error { + r.mu.Lock() + defer r.mu.Unlock() + + info := c.Info() + + switch info.CapabilityType { + case CapabilityTypeTrigger: + _, ok := c.(TriggerCapability) + if !ok { + return fmt.Errorf("trigger capability does not satisfy TriggerCapability interface") + } + case CapabilityTypeAction: + _, ok := c.(ActionCapability) + if !ok { + return fmt.Errorf("action does not satisfy ActionCapability interface") + } + case CapabilityTypeConsensus: + _, ok := c.(ConsensusCapability) + if !ok { + return fmt.Errorf("consensus capability does not satisfy ConsensusCapability interface") + } + case CapabilityTypeTarget: + _, ok := c.(TargetCapability) + if !ok { + return fmt.Errorf("target capability does not satisfy TargetCapability interface") + } + default: + return fmt.Errorf("unknown capability type: %s", info.CapabilityType) + } + + id := info.ID + _, ok := r.m[id] + if ok { + return fmt.Errorf("capability with id: %s already exists", id) + } + + r.m[id] = c + return nil + +} + +// NewRegistry returns a new Registry. +func NewRegistry() *Registry { + return &Registry{ + m: map[string]BaseCapability{}, + } +} diff --git a/core/capabilities/registry_test.go b/core/capabilities/registry_test.go new file mode 100644 index 00000000000..a8c51827b32 --- /dev/null +++ b/core/capabilities/registry_test.go @@ -0,0 +1,182 @@ +package capabilities_test + +import ( + "context" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/triggers" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +type mockCapability struct { + capabilities.CapabilityInfo +} + +func (m *mockCapability) Execute(ctx context.Context, callback chan capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error { + return nil +} + +func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { + return nil +} + +func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { + return nil +} + +func TestRegistry(t *testing.T) { + ctx := testutils.Context(t) + + r := capabilities.NewRegistry() + + id := "capability-1" + ci, err := capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeAction, + "capability-1-description", + "v1.0.0", + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + err = r.Add(ctx, c) + require.NoError(t, err) + + gc, err := r.Get(ctx, id) + require.NoError(t, err) + + assert.Equal(t, c, gc) + + cs := r.List(ctx) + assert.Len(t, cs, 1) + assert.Equal(t, c, cs[0]) +} + +func TestRegistry_NoDuplicateIDs(t *testing.T) { + ctx := testutils.Context(t) + r := capabilities.NewRegistry() + + id := "capability-1" + ci, err := capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeAction, + "capability-1-description", + "v1.0.0", + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + err = r.Add(ctx, c) + require.NoError(t, err) + + ci, err = capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeConsensus, + "capability-2-description", + "v1.0.0", + ) + require.NoError(t, err) + c2 := &mockCapability{CapabilityInfo: ci} + + err = r.Add(ctx, c2) + assert.ErrorContains(t, err, "capability with id: capability-1 already exists") +} + +func TestRegistry_ChecksExecutionAPIByType(t *testing.T) { + tcs := []struct { + name string + newCapability func(ctx context.Context, reg *capabilities.Registry) (string, error) + getCapability func(ctx context.Context, reg *capabilities.Registry, id string) error + errContains string + }{ + { + name: "action", + newCapability: func(ctx context.Context, reg *capabilities.Registry) (string, error) { + id := uuid.New().String() + ci, err := capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeAction, + "capability-1-description", + "v1.0.0", + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + return id, reg.Add(ctx, c) + }, + getCapability: func(ctx context.Context, reg *capabilities.Registry, id string) error { + _, err := reg.GetAction(ctx, id) + return err + }, + }, + { + name: "target", + newCapability: func(ctx context.Context, reg *capabilities.Registry) (string, error) { + id := uuid.New().String() + ci, err := capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeTarget, + "capability-1-description", + "v1.0.0", + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + return id, reg.Add(ctx, c) + }, + getCapability: func(ctx context.Context, reg *capabilities.Registry, id string) error { + _, err := reg.GetTarget(ctx, id) + return err + }, + }, + { + name: "trigger", + newCapability: func(ctx context.Context, reg *capabilities.Registry) (string, error) { + odt := triggers.NewOnDemand() + info := odt.Info() + return info.ID, reg.Add(ctx, odt) + }, + getCapability: func(ctx context.Context, reg *capabilities.Registry, id string) error { + _, err := reg.GetTrigger(ctx, id) + return err + }, + }, + { + name: "consensus", + newCapability: func(ctx context.Context, reg *capabilities.Registry) (string, error) { + id := uuid.New().String() + ci, err := capabilities.NewCapabilityInfo( + id, + capabilities.CapabilityTypeConsensus, + "capability-1-description", + "v1.0.0", + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + return id, reg.Add(ctx, c) + }, + getCapability: func(ctx context.Context, reg *capabilities.Registry, id string) error { + _, err := reg.GetConsensus(ctx, id) + return err + }, + }, + } + + ctx := testutils.Context(t) + reg := capabilities.NewRegistry() + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + id, err := tc.newCapability(ctx, reg) + require.NoError(t, err) + + err = tc.getCapability(ctx, reg, id) + require.NoError(t, err) + }) + } +} diff --git a/core/capabilities/triggers/on_demand_trigger.go b/core/capabilities/triggers/on_demand_trigger.go new file mode 100644 index 00000000000..1260a14568f --- /dev/null +++ b/core/capabilities/triggers/on_demand_trigger.go @@ -0,0 +1,75 @@ +package triggers + +import ( + "context" + "fmt" + "sync" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities" +) + +var info = capabilities.MustNewCapabilityInfo( + "on-demand-trigger", + capabilities.CapabilityTypeTrigger, + "An example on-demand trigger.", + "v1.0.0", +) + +type OnDemand struct { + capabilities.CapabilityInfo + chans map[string]chan capabilities.CapabilityResponse + mu sync.Mutex +} + +func NewOnDemand() *OnDemand { + return &OnDemand{ + CapabilityInfo: info, + chans: map[string]chan capabilities.CapabilityResponse{}, + } +} + +func (o *OnDemand) FanOutEvent(ctx context.Context, event capabilities.CapabilityResponse) error { + o.mu.Lock() + defer o.mu.Unlock() + for _, ch := range o.chans { + ch <- event + } + return nil +} + +// Execute executes the on-demand trigger. +func (o *OnDemand) SendEvent(ctx context.Context, wid string, event capabilities.CapabilityResponse) error { + o.mu.Lock() + defer o.mu.Unlock() + ch, ok := o.chans[wid] + if !ok { + return fmt.Errorf("no registration for %s", wid) + } + + ch <- event + return nil +} + +func (o *OnDemand) RegisterTrigger(ctx context.Context, callback chan capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error { + weid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + o.chans[weid] = callback + return nil +} + +func (o *OnDemand) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error { + weid := req.Metadata.WorkflowID + + o.mu.Lock() + defer o.mu.Unlock() + + ch, ok := o.chans[weid] + if ok { + close(ch) + } + delete(o.chans, weid) + return nil +} diff --git a/core/capabilities/triggers/on_demand_trigger_test.go b/core/capabilities/triggers/on_demand_trigger_test.go new file mode 100644 index 00000000000..ee7337a1824 --- /dev/null +++ b/core/capabilities/triggers/on_demand_trigger_test.go @@ -0,0 +1,81 @@ +package triggers + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" +) + +const testID = "test-id-1" + +func TestOnDemand(t *testing.T) { + r := capabilities.NewRegistry() + tr := NewOnDemand() + ctx := testutils.Context(t) + + err := r.Add(ctx, tr) + require.NoError(t, err) + + trigger, err := r.GetTrigger(ctx, tr.Info().ID) + require.NoError(t, err) + + callback := make(chan capabilities.CapabilityResponse, 10) + + req := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowExecutionID: testID, + }, + } + err = trigger.RegisterTrigger(ctx, callback, req) + require.NoError(t, err) + + er := capabilities.CapabilityResponse{ + Value: &values.String{Underlying: testID}, + } + + err = tr.FanOutEvent(ctx, er) + require.NoError(t, err) + + assert.Len(t, callback, 1) + assert.Equal(t, er, <-callback) +} + +func TestOnDemand_ChannelDoesntExist(t *testing.T) { + tr := NewOnDemand() + ctx := testutils.Context(t) + + er := capabilities.CapabilityResponse{ + Value: &values.String{Underlying: testID}, + } + err := tr.SendEvent(ctx, testID, er) + assert.ErrorContains(t, err, "no registration") +} + +func TestOnDemand_(t *testing.T) { + tr := NewOnDemand() + ctx := testutils.Context(t) + + req := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: "hello", + }, + } + callback := make(chan capabilities.CapabilityResponse, 10) + + err := tr.RegisterTrigger(ctx, callback, req) + require.NoError(t, err) + + er := capabilities.CapabilityResponse{ + Value: &values.String{Underlying: testID}, + } + err = tr.SendEvent(ctx, "hello", er) + require.NoError(t, err) + + assert.Len(t, callback, 1) + assert.Equal(t, er, <-callback) +} diff --git a/go.mod b/go.mod index cbcad20919f..6c0ea3281c0 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/crypto v0.17.0 golang.org/x/exp v0.0.0-20231127185646-65229373498e + golang.org/x/mod v0.14.0 golang.org/x/sync v0.5.0 golang.org/x/term v0.15.0 golang.org/x/text v0.14.0 @@ -312,7 +313,6 @@ require ( go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/ratelimit v0.2.0 // indirect golang.org/x/arch v0.6.0 // indirect - golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect