-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KS-15 Add a capabilities library to the core node #11811
Changes from all commits
0753776
1d0ea04
bebe6cd
7694b3f
8ab7945
332d180
8cca9ce
43053c6
f906ba3
3aaa43f
af8eb3f
6a9ea80
efa3b91
6274c1e
508dddf
1924ea8
a4b07ec
0ff16b8
5bb25e0
bfde8f1
5d1fea6
516b50e
456a2e4
7b8ac45
42732ec
7c4ae33
2b60ec4
6eb8ad1
74ef706
f79fc5b
3709f1a
a5c8b66
555b087
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,259 @@ | ||
package capabilities | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"regexp" | ||
"time" | ||
|
||
"golang.org/x/mod/semver" | ||
|
||
"github.com/smartcontractkit/chainlink-common/pkg/values" | ||
) | ||
|
||
// CapabilityType is an enum for the type of capability. | ||
type CapabilityType int | ||
|
||
// CapabilityType enum values. | ||
const ( | ||
CapabilityTypeTrigger CapabilityType = iota | ||
CapabilityTypeAction | ||
CapabilityTypeConsensus | ||
CapabilityTypeTarget | ||
) | ||
|
||
// String returns a string representation of CapabilityType | ||
func (c CapabilityType) String() string { | ||
switch c { | ||
case CapabilityTypeTrigger: | ||
return "trigger" | ||
case CapabilityTypeAction: | ||
return "action" | ||
case CapabilityTypeConsensus: | ||
return "report" | ||
case CapabilityTypeTarget: | ||
return "target" | ||
} | ||
|
||
// Panic as this should be unreachable. | ||
panic("unknown capability type") | ||
} | ||
|
||
// IsValid checks if the capability type is valid. | ||
func (c CapabilityType) IsValid() error { | ||
switch c { | ||
case CapabilityTypeTrigger, | ||
CapabilityTypeAction, | ||
CapabilityTypeConsensus, | ||
CapabilityTypeTarget: | ||
return nil | ||
} | ||
|
||
return fmt.Errorf("invalid capability type: %s", c) | ||
} | ||
|
||
// CapabilityResponse is a struct for the Execute response of a capability. | ||
type CapabilityResponse struct { | ||
Value values.Value | ||
Err error | ||
} | ||
|
||
type RequestMetadata struct { | ||
WorkflowID string | ||
WorkflowExecutionID string | ||
} | ||
|
||
type RegistrationMetadata struct { | ||
WorkflowID string | ||
} | ||
|
||
// CapabilityRequest is a struct for the Execute request of a capability. | ||
type CapabilityRequest struct { | ||
Metadata RequestMetadata | ||
Config *values.Map | ||
Inputs *values.Map | ||
} | ||
|
||
type RegisterToWorkflowRequest struct { | ||
Metadata RegistrationMetadata | ||
Config *values.Map | ||
} | ||
|
||
type UnregisterFromWorkflowRequest struct { | ||
Metadata RegistrationMetadata | ||
Config *values.Map | ||
} | ||
|
||
// CallbackExecutable is an interface for executing a capability. | ||
type CallbackExecutable interface { | ||
RegisterToWorkflow(ctx context.Context, request RegisterToWorkflowRequest) error | ||
UnregisterFromWorkflow(ctx context.Context, request UnregisterFromWorkflowRequest) error | ||
// Capability must respect context.Done and cleanup any request specific resources | ||
// when the context is cancelled. When a request has been completed the capability | ||
// is also expected to close the callback channel. | ||
// Request specific configuration is passed in via the request parameter. | ||
// A successful response must always return a value. An error is assumed otherwise. | ||
// The intent is to make the API explicit. | ||
Execute(ctx context.Context, callback chan CapabilityResponse, request CapabilityRequest) error | ||
} | ||
|
||
// BaseCapability interface needs to be implemented by all capability types. | ||
// Capability interfaces are intentionally duplicated to allow for an easy change | ||
// or extension in the future. | ||
type BaseCapability interface { | ||
Info() CapabilityInfo | ||
} | ||
|
||
// TriggerCapability interface needs to be implemented by all trigger capabilities. | ||
type TriggerCapability interface { | ||
BaseCapability | ||
RegisterTrigger(ctx context.Context, callback chan CapabilityResponse, request CapabilityRequest) error | ||
UnregisterTrigger(ctx context.Context, request CapabilityRequest) error | ||
} | ||
|
||
// ActionCapability interface needs to be implemented by all action capabilities. | ||
type ActionCapability interface { | ||
BaseCapability | ||
CallbackExecutable | ||
} | ||
|
||
// ConsensusCapability interface needs to be implemented by all consensus capabilities. | ||
type ConsensusCapability interface { | ||
BaseCapability | ||
CallbackExecutable | ||
} | ||
|
||
// TargetsCapability interface needs to be implemented by all target capabilities. | ||
type TargetCapability interface { | ||
BaseCapability | ||
CallbackExecutable | ||
} | ||
|
||
// CapabilityInfo is a struct for the info of a capability. | ||
type CapabilityInfo struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where does this return what the strong type is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmmmm -- what do you mean by "strong type"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nolag are you talking about CapabilityType? |
||
ID string | ||
CapabilityType CapabilityType | ||
Description string | ||
Version string | ||
} | ||
|
||
// Info returns the info of the capability. | ||
func (c CapabilityInfo) Info() CapabilityInfo { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May want to return map[string]CapabilityInfo so we can add name to execute. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CapabilityInfo includes ID, isn't that enough? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a separate conversation that is resolved now - we've parked this for now and @cedric-cordenier will write an exploration doc on whether we want to pass in a name field or not. |
||
return c | ||
} | ||
|
||
var idRegex = regexp.MustCompile(`[a-z0-9_\-:]`) | ||
|
||
const ( | ||
// TODO: this length was largely picked arbitrarily. | ||
// Consider what a realistic/desirable value should be. | ||
// See: https://smartcontract-it.atlassian.net/jira/software/c/projects/KS/boards/182 | ||
idMaxLength = 128 | ||
) | ||
|
||
// NewCapabilityInfo returns a new CapabilityInfo. | ||
func NewCapabilityInfo( | ||
id string, | ||
capabilityType CapabilityType, | ||
description string, | ||
version string, | ||
) (CapabilityInfo, error) { | ||
if len(id) > idMaxLength { | ||
return CapabilityInfo{}, fmt.Errorf("invalid id: %s exceeds max length %d", id, idMaxLength) | ||
} | ||
if !idRegex.MatchString(id) { | ||
cedric-cordenier marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return CapabilityInfo{}, fmt.Errorf("invalid id: %s. Allowed: %s", id, idRegex) | ||
} | ||
|
||
if ok := semver.IsValid(version); !ok { | ||
return CapabilityInfo{}, fmt.Errorf("invalid version: %+v", version) | ||
} | ||
|
||
if err := capabilityType.IsValid(); err != nil { | ||
return CapabilityInfo{}, err | ||
} | ||
|
||
return CapabilityInfo{ | ||
ID: id, | ||
CapabilityType: capabilityType, | ||
Description: description, | ||
Version: version, | ||
}, nil | ||
} | ||
|
||
// MustNewCapabilityInfo returns a new CapabilityInfo, | ||
// panicking if we could not instantiate a CapabilityInfo. | ||
func MustNewCapabilityInfo( | ||
id string, | ||
capabilityType CapabilityType, | ||
description string, | ||
version string, | ||
) CapabilityInfo { | ||
c, err := NewCapabilityInfo(id, capabilityType, description, version) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
return c | ||
} | ||
|
||
// TODO: this timeout was largely picked arbitrarily. | ||
// Consider what a realistic/desirable value should be. | ||
// See: https://smartcontract-it.atlassian.net/jira/software/c/projects/KS/boards/182 | ||
var defaultExecuteTimeout = 10 * time.Second | ||
bolekk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// ExecuteSync executes a capability synchronously. | ||
// We are not handling a case where a capability panics and crashes. | ||
// There is default timeout of 10 seconds. If a capability takes longer than | ||
// that then it should be executed asynchronously. | ||
func ExecuteSync(ctx context.Context, c CallbackExecutable, request CapabilityRequest) (*values.List, error) { | ||
ctxWithT, cancel := context.WithTimeout(ctx, defaultExecuteTimeout) | ||
defer cancel() | ||
|
||
responseCh := make(chan CapabilityResponse) | ||
setupCh := make(chan error) | ||
|
||
go func(innerCtx context.Context, innerC CallbackExecutable, innerReq CapabilityRequest, innerCallback chan CapabilityResponse, errCh chan error) { | ||
setupErr := innerC.Execute(innerCtx, innerCallback, innerReq) | ||
setupCh <- setupErr | ||
}(ctxWithT, c, request, responseCh, setupCh) | ||
|
||
vs := make([]values.Value, 0) | ||
outerLoop: | ||
for { | ||
select { | ||
case response, isOpen := <-responseCh: | ||
if !isOpen { | ||
break outerLoop | ||
} | ||
// An error means execution has been interrupted. | ||
// We'll return the value discarding values received | ||
// until now. | ||
if response.Err != nil { | ||
return nil, response.Err | ||
} | ||
|
||
vs = append(vs, response.Value) | ||
|
||
// Timeout when a capability panics, crashes, and does not close the channel. | ||
case <-ctxWithT.Done(): | ||
return nil, fmt.Errorf("context timed out. If you did not set a timeout, be aware that the default ExecuteSync timeout is %f seconds", defaultExecuteTimeout.Seconds()) | ||
} | ||
} | ||
|
||
setupErr := <-setupCh | ||
|
||
// Something went wrong when setting up a capability. | ||
if setupErr != nil { | ||
return nil, setupErr | ||
} | ||
|
||
// If the capability did not return any values, we deem it as an error. | ||
// The intent is for the API to be explicit. | ||
if len(vs) == 0 { | ||
return nil, errors.New("capability did not return any values") | ||
} | ||
|
||
return &values.List{Underlying: vs}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: some comments feel excessive in this file, code is self-explanatory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
golint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh? is golint forcing us to have comments everywhere?