Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-370] Pass config from onchain registry to execute calls #13750

Merged
merged 17 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 59 additions & 148 deletions core/capabilities/launcher.go

Large diffs are not rendered by default.

408 changes: 152 additions & 256 deletions core/capabilities/launcher_test.go

Large diffs are not rendered by default.

36 changes: 31 additions & 5 deletions core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,42 @@ var (
ErrCapabilityAlreadyExists = errors.New("capability already exists")
)

type metadataRegistry interface {
LocalNode(ctx context.Context) (capabilities.Node, error)
ConfigForCapability(ctx context.Context, capabilityID string, donID uint32) (capabilities.CapabilityConfiguration, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/ CapabilityConfig would be inline with using LocalNode above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krehermann I'm not sure I'm following, wdym by inline here? re you suggesting we'd put the CapabilityConfiguration inside the Node struct? That doesn't work since it won't account for remote capabilities.

}

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
lggr logger.Logger
m map[string]capabilities.BaseCapability
mu sync.RWMutex
metadataRegistry metadataRegistry
lggr logger.Logger
m map[string]capabilities.BaseCapability
mu sync.RWMutex
}

func (r *Registry) LocalNode(ctx context.Context) (capabilities.Node, error) {
if r.metadataRegistry == nil {
return capabilities.Node{}, errors.New("metadataRegistry information not available")
}

return r.metadataRegistry.LocalNode(ctx)
}

func (r *Registry) GetLocalNode(_ context.Context) (capabilities.Node, error) {
return capabilities.Node{}, nil
func (r *Registry) ConfigForCapability(ctx context.Context, capabilityID string, donID uint32) (capabilities.CapabilityConfiguration, error) {
if r.metadataRegistry == nil {
return capabilities.CapabilityConfiguration{}, errors.New("metadataRegistry information not available")
}

return r.metadataRegistry.ConfigForCapability(ctx, capabilityID, donID)
}

// SetLocalRegistry sets a local copy of the offchain registry for the registry to use.
// This is only public for testing purposes; the only production use should be from the CapabilitiesLauncher.
func (r *Registry) SetLocalRegistry(lr metadataRegistry) {
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
r.mu.Lock()
defer r.mu.Unlock()
r.metadataRegistry = lr
}

// Get gets a capability from the registry.
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks"

commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks"
)

type testReceiver struct {
Expand Down
10 changes: 5 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config *types.RemoteTriggerConfig
config capabilities.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
Expand All @@ -48,7 +48,7 @@ type pubRegState struct {
var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config *types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
func NewTriggerPublisher(config capabilities.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
}
// NOTE: require 2F+1 by default, introduce different strategies later (KS-76)
minRequired := uint32(2*callerDon.F + 1)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-p.config.RegistrationExpiry.Milliseconds(), false)
if !ready {
p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired)
return
Expand Down Expand Up @@ -133,7 +133,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {

func (p *triggerPublisher) registrationCleanupLoop() {
defer p.wg.Done()
ticker := time.NewTicker(time.Duration(p.config.RegistrationExpiryMs) * time.Millisecond)
ticker := time.NewTicker(p.config.RegistrationExpiry)
defer ticker.Stop()
for {
select {
Expand All @@ -144,7 +144,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
p.mu.RLock()
for key, req := range p.registrations {
callerDon := p.workflowDONs[key.callerDonId]
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-int64(p.config.RegistrationExpiryMs), false)
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false)
if !ready {
p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId)
ctx, cancel := p.stopCh.NewCtx()
Expand Down
10 changes: 6 additions & 4 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package remote_test
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand Down Expand Up @@ -40,11 +42,11 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100_000,
config := capabilities.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
MessageExpiryMs: 100_000,
MessageExpiry: 100 * time.Second,
}
workflowDONs := map[uint32]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand Down
14 changes: 7 additions & 7 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
//
// TriggerSubscriber communicates with corresponding TriggerReceivers on remote nodes.
type triggerSubscriber struct {
config *types.RemoteTriggerConfig
config capabilities.RemoteTriggerConfig
capInfo commoncap.CapabilityInfo
capDonInfo capabilities.DON
capDonMembers map[p2ptypes.PeerID]struct{}
Expand Down Expand Up @@ -55,7 +55,7 @@ var _ services.Service = &triggerSubscriber{}
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config *types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
func NewTriggerSubscriber(config capabilities.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -121,7 +121,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commonc

func (s *triggerSubscriber) registrationLoop() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.config.RegistrationRefreshMs) * time.Millisecond)
ticker := time.NewTicker(s.config.RegistrationRefresh)
defer ticker.Stop()
for {
select {
Expand Down Expand Up @@ -195,9 +195,9 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {
nowMs := time.Now().UnixMilli()
s.mu.RLock()
creationTs := s.messageCache.Insert(key, sender, nowMs, msg.Payload)
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-int64(s.config.MessageExpiryMs), true)
ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true)
s.mu.RUnlock()
if nowMs-creationTs > int64(s.config.RegistrationExpiryMs) {
if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() {
s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "sender", sender)
continue
}
Expand All @@ -219,15 +219,15 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) {

func (s *triggerSubscriber) eventCleanupLoop() {
defer s.wg.Done()
ticker := time.NewTicker(time.Duration(s.config.MessageExpiryMs) * time.Millisecond)
ticker := time.NewTicker(s.config.MessageExpiry)
defer ticker.Stop()
for {
select {
case <-s.stopCh:
return
case <-ticker.C:
s.mu.Lock()
s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - int64(s.config.MessageExpiryMs))
s.messageCache.DeleteOlderThan(time.Now().UnixMilli() - s.config.MessageExpiry.Milliseconds())
s.mu.Unlock()
}
}
Expand Down
10 changes: 6 additions & 4 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package remote_test

import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
Expand Down Expand Up @@ -61,11 +63,11 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
})

// register trigger
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100,
config := capabilities.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
MessageExpiryMs: 100_000,
MessageExpiry: 100 * time.Second,
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
Expand Down
21 changes: 0 additions & 21 deletions core/capabilities/remote/types/config.go

This file was deleted.

Loading
Loading