Skip to content

Commit

Permalink
[KS-74] Remote trigger (#12657)
Browse files Browse the repository at this point in the history
1. TriggerPublisher to receive subscriptions and publish events to remote subscribers.
2. TriggerSubscriber to update remote subscriptions and receive events.
3. MessageCache helper structure.
4. Minor changes to Message fields.
  • Loading branch information
bolekk authored Apr 3, 2024
1 parent c007ea8 commit 95050a9
Show file tree
Hide file tree
Showing 17 changed files with 1,333 additions and 102 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-jeans-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Remote Trigger
2 changes: 1 addition & 1 deletion core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *dispatcher) receive() {
d.tryRespondWithError(msg.Sender, body, types.Error_VALIDATION_FAILED)
continue
}
k := key{body.CapabilityId, body.DonId}
k := key{body.CapabilityId, body.CapabilityDonId}
d.mu.RLock()
receiver, ok := d.receivers[k]
d.mu.RUnlock()
Expand Down
87 changes: 87 additions & 0 deletions core/capabilities/remote/message_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package remote

// MessageCache is a simple store for messages, grouped by event ID and peer ID.
// It is used to collect messages from multiple peers until they are ready for aggregation
// based on quantity and freshness.
type messageCache[EventID comparable, PeerID comparable] struct {
events map[EventID]*eventState[PeerID]
}

type eventState[PeerID comparable] struct {
peerMsgs map[PeerID]*msgState
creationTimestamp int64
wasReady bool
}

type msgState struct {
timestamp int64
payload []byte
}

func NewMessageCache[EventID comparable, PeerID comparable]() *messageCache[EventID, PeerID] {
return &messageCache[EventID, PeerID]{
events: make(map[EventID]*eventState[PeerID]),
}
}

// Insert or overwrite a message for <eventID>. Return creation timestamp of the event.
func (c *messageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, timestamp int64, payload []byte) int64 {
if _, ok := c.events[eventID]; !ok {
c.events[eventID] = &eventState[PeerID]{
peerMsgs: make(map[PeerID]*msgState),
creationTimestamp: timestamp,
}
}
c.events[eventID].peerMsgs[peerID] = &msgState{
timestamp: timestamp,
payload: payload,
}
return c.events[eventID].creationTimestamp
}

// Return true if there are messages from at least <minCount> peers,
// received more recently than <minTimestamp>.
// Return all messages that satisfy the above condition.
// Ready() will return true at most once per event if <once> is true.
func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, [][]byte) {
ev, ok := c.events[eventID]
if !ok {
return false, nil
}
if ev.wasReady && once {
return false, nil
}
if uint32(len(ev.peerMsgs)) < minCount {
return false, nil
}
countAboveMinTimestamp := uint32(0)
accPayloads := [][]byte{}
for _, msg := range ev.peerMsgs {
if msg.timestamp >= minTimestamp {
countAboveMinTimestamp++
accPayloads = append(accPayloads, msg.payload)
if countAboveMinTimestamp >= minCount {
ev.wasReady = true
return true, accPayloads
}
}
}
return false, nil
}

func (c *messageCache[EventID, PeerID]) Delete(eventID EventID) {
delete(c.events, eventID)
}

// Return the number of events deleted.
// Scans all keys, which might be slow for large caches.
func (c *messageCache[EventID, PeerID]) DeleteOlderThan(cutoffTimestamp int64) int {
nDeleted := 0
for id, event := range c.events {
if event.creationTimestamp < cutoffTimestamp {
c.Delete(id)
nDeleted++
}
}
return nDeleted
}
61 changes: 61 additions & 0 deletions core/capabilities/remote/message_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package remote_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
)

const (
eventId1 = "event1"
eventId2 = "event2"
peerId1 = "peer1"
peerId2 = "peer2"
payloadA = "payloadA"
)

func TestMessageCache_InsertReady(t *testing.T) {
cache := remote.NewMessageCache[string, string]()

// not ready with one message
ts := cache.Insert(eventId1, peerId1, 100, []byte(payloadA))
require.Equal(t, int64(100), ts)
ready, _ := cache.Ready(eventId1, 2, 100, true)
require.False(t, ready)

// not ready with two messages but only one fresh enough
ts = cache.Insert(eventId1, peerId2, 200, []byte(payloadA))
require.Equal(t, int64(100), ts)
ready, _ = cache.Ready(eventId1, 2, 150, true)
require.False(t, ready)

// ready with two messages (once only)
ready, messages := cache.Ready(eventId1, 2, 100, true)
require.True(t, ready)
require.Equal(t, []byte(payloadA), messages[0])
require.Equal(t, []byte(payloadA), messages[1])

// not ready again for the same event ID
ready, _ = cache.Ready(eventId1, 2, 100, true)
require.False(t, ready)
}

func TestMessageCache_DeleteOlderThan(t *testing.T) {
cache := remote.NewMessageCache[string, string]()

ts := cache.Insert(eventId1, peerId1, 100, []byte(payloadA))
require.Equal(t, int64(100), ts)
ts = cache.Insert(eventId2, peerId2, 200, []byte(payloadA))
require.Equal(t, int64(200), ts)

deleted := cache.DeleteOlderThan(150)
require.Equal(t, 1, deleted)

deleted = cache.DeleteOlderThan(150)
require.Equal(t, 0, deleted)

deleted = cache.DeleteOlderThan(201)
require.Equal(t, 1, deleted)
}
6 changes: 3 additions & 3 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- common
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
CapabilityId: c.capInfo.ID,
DonId: c.donInfo.ID,
Payload: []byte{0x01, 0x02, 0x03},
CapabilityId: c.capInfo.ID,
CapabilityDonId: c.donInfo.ID,
Payload: []byte{0x01, 0x02, 0x03},
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
Expand Down
221 changes: 221 additions & 0 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package remote

import (
"context"
sync "sync"
"time"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// TriggerPublisher manages all external users of a local trigger capability.
// Its responsibilities are:
// 1. Manage trigger registrations from external nodes (receive, store, aggregate, expire).
// 2. Send out events produced by an underlying, concrete trigger implementation.
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config types.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo types.DON
workflowDONs map[string]types.DON
dispatcher types.Dispatcher
messageCache *messageCache[registrationKey, p2ptypes.PeerID]
registrations map[registrationKey]*pubRegState
mu sync.RWMutex // protects messageCache and registrations
stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
}

type registrationKey struct {
callerDonId string
workflowId string
}

type pubRegState struct {
callback chan<- commoncap.CapabilityResponse
request commoncap.CapabilityRequest
}

var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, workflowDONs map[string]types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
underlying: underlying,
capInfo: capInfo,
capDonInfo: capDonInfo,
workflowDONs: workflowDONs,
dispatcher: dispatcher,
messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](),
registrations: make(map[registrationKey]*pubRegState),
stopCh: make(services.StopChan),
lggr: lggr,
}
}

func (p *triggerPublisher) Start(ctx context.Context) error {
p.wg.Add(1)
go p.registrationCleanupLoop()
p.lggr.Info("TriggerPublisher started")
return nil
}

func (p *triggerPublisher) Receive(msg *types.MessageBody) {
sender := ToPeerID(msg.Sender)
if msg.Method == types.MethodRegisterTrigger {
req, err := pb.UnmarshalCapabilityRequest(msg.Payload)
if err != nil {
p.lggr.Errorw("failed to unmarshal capability request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callerDon, ok := p.workflowDONs[msg.CallerDonId]
if !ok {
p.lggr.Errorw("received a message from unsupported workflow DON", "capabilityId", p.capInfo.ID, "callerDonId", msg.CallerDonId)
return
}
p.lggr.Debugw("received trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "sender", sender)
key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID}
nowMs := time.Now().UnixMilli()
p.mu.Lock()
p.messageCache.Insert(key, sender, nowMs, msg.Payload)
// NOTE: require 2F+1 by default, introduce different strategies later (KS-76)
minRequired := uint32(2*callerDon.F + 1)
ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false)
p.mu.Unlock()
if !ready {
p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired)
return
}
agg := NewDefaultModeAggregator(uint32(callerDon.F + 1))
aggregated, err := agg.Aggregate("", payloads)
if err != nil {
p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
return
}
unmarshaled, err := pb.UnmarshalCapabilityRequest(aggregated)
if err != nil {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
p.mu.Lock()
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
cancel()
if err == nil {
p.registrations[key] = &pubRegState{
callback: callbackCh,
request: unmarshaled,
}
p.wg.Add(1)
go p.triggerEventLoop(callbackCh, key)
p.lggr.Debugw("updated trigger registration", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID)
} else {
p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err)
}
p.mu.Unlock()
} else {
p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender)
}
}

func (p *triggerPublisher) registrationCleanupLoop() {
defer p.wg.Done()
ticker := time.NewTicker(time.Duration(p.config.RegistrationExpiryMs) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-p.stopCh:
return
case <-ticker.C:
now := time.Now().UnixMilli()
p.mu.RLock()
for key, req := range p.registrations {
callerDon := p.workflowDONs[key.callerDonId]
ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-int64(p.config.RegistrationExpiryMs), false)
if !ready {
p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId)
ctx, cancel := p.stopCh.NewCtx()
err := p.underlying.UnregisterTrigger(ctx, req.request)
cancel()
p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err)
// after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel
close(req.callback)
delete(p.registrations, key)
p.messageCache.Delete(key)
}
}
p.mu.RUnlock()
}
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.CapabilityResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
case <-p.stopCh:
return
case response, ok := <-callbackCh:
if !ok {
p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
return
}
p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId)
marshaled, err := pb.MarshalCapabilityResponse(response)
if err != nil {
p.lggr.Debugw("can't marshal trigger event", "err", err)
break
}
msg := &types.MessageBody{
CapabilityId: p.capInfo.ID,
CapabilityDonId: p.capDonInfo.ID,
CallerDonId: key.callerDonId,
Method: types.MethodTriggerEvent,
Payload: marshaled,
Metadata: &types.MessageBody_TriggerEventMetadata{
TriggerEventMetadata: &types.TriggerEventMetadata{
// NOTE: optionally introduce batching across workflows as an optimization
WorkflowIds: []string{key.workflowId},
},
},
}
// NOTE: send to all nodes by default, introduce different strategies later (KS-76)
for _, peerID := range p.workflowDONs[key.callerDonId].Members {
err = p.dispatcher.Send(peerID, msg)
if err != nil {
p.lggr.Errorw("failed to send trigger event", "capabilityId", p.capInfo.ID, "peerID", peerID, "err", err)
}
}
}
}
}

func (p *triggerPublisher) Close() error {
close(p.stopCh)
p.wg.Wait()
p.lggr.Info("TriggerPublisher closed")
return nil
}

func (p *triggerPublisher) Ready() error {
return nil
}

func (p *triggerPublisher) HealthReport() map[string]error {
return nil
}

func (p *triggerPublisher) Name() string {
return "TriggerPublisher"
}
Loading

0 comments on commit 95050a9

Please sign in to comment.