Skip to content

Commit

Permalink
Merge branch 'develop' into task/ship-2627-update-error
Browse files Browse the repository at this point in the history
  • Loading branch information
KodeyThomas authored Aug 8, 2024
2 parents f1af979 + 0f166ad commit 49a631a
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
14 changes: 14 additions & 0 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type triggerPublisher struct {
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
workflowDONs map[uint32]commoncap.DON
membersCache map[uint32]map[p2ptypes.PeerID]bool
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
Expand Down Expand Up @@ -54,12 +55,21 @@ func NewTriggerPublisher(config *capabilities.RemoteTriggerConfig, underlying co
config = &capabilities.RemoteTriggerConfig{}
}
config.ApplyDefaults()
membersCache := make(map[uint32]map[p2ptypes.PeerID]bool)
for id, don := range workflowDONs {
cache := make(map[p2ptypes.PeerID]bool)
for _, member := range don.Members {
cache[member] = true
}
membersCache[id] = cache
}
return &triggerPublisher{
config: config,
underlying: underlying,
capInfo: capInfo,
capDonInfo: capDonInfo,
workflowDONs: workflowDONs,
membersCache: membersCache,
dispatcher: dispatcher,
messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
Expand Down Expand Up @@ -88,6 +98,10 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) {
p.lggr.Errorw("received a message from unsupported workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId)
return
}
if !p.membersCache[msg.CallerDonId][sender] {
p.lggr.Errorw("sender not a member of its workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId, "sender", sender)
return
}
p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender)
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
Expand Down
9 changes: 7 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand Down Expand Up @@ -42,7 +41,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := &capabilities.RemoteTriggerConfig{
config := &commoncap.RemoteTriggerConfig{
RegistrationRefresh: 100 * time.Millisecond,
RegistrationExpiry: 100 * time.Second,
MinResponsesToAggregate: 1,
Expand Down Expand Up @@ -73,6 +72,12 @@ func TestTriggerPublisher_Register(t *testing.T) {
Payload: marshaled,
}
publisher.Receive(ctx, regEvent)
// node p1 is not a member of the workflow DON so registration shoudn't happen
require.Empty(t, underlying.registrationsCh)

regEvent.Sender = p2[:]
publisher.Receive(ctx, regEvent)
require.NotEmpty(t, underlying.registrationsCh)
forwarded := <-underlying.registrationsCh
require.Equal(t, capRequest.Metadata.WorkflowID, forwarded.Metadata.WorkflowID)

Expand Down

0 comments on commit 49a631a

Please sign in to comment.