diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index d904cb447a2..34242ee6fbd 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -136,18 +136,21 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err) } - // Special treatment for local targets - wrap into a transmission capability - target, isTarget := cp.(capabilities.TargetCapability) - if isTarget { - capInfo, err2 := target.Info(ctx) - if err2 != nil { - return fmt.Errorf("failed to get info of target capability: %w", err2) - } + info, err := cp.Info(ctx) + if err != nil { + return fmt.Errorf("failed to get info of capability with id %s: %w", step.ID, err) + } - // If the DON is nil this is a local target - if capInfo.DON == nil { - cp = transmission.NewLocalTargetCapability(e.logger, *e.donInfo.PeerID(), *e.donInfo.DON, target) - } + // Special treatment for local targets - wrap into a transmission capability + // If the DON is nil, this is a local target. + if info.CapabilityType == capabilities.CapabilityTypeTarget && info.DON == nil { + e.logger.Debugf("wrapping capability %s in local transmission protocol", info.ID) + cp = transmission.NewLocalTargetCapability( + e.logger, + *e.donInfo.PeerID(), + *e.donInfo.DON, + cp.(capabilities.TargetCapability), + ) } // We configure actions, consensus and targets here, and diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index c719a2dc42a..54b92711d17 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -3,6 +3,7 @@ package workflows import ( "context" "errors" + "fmt" "testing" "time" @@ -72,6 +73,7 @@ targets: type testHooks struct { initFailed chan struct{} + initSuccessful chan struct{} executionFinished chan string } @@ -79,6 +81,7 @@ type testHooks struct { func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...func(c *Config)) (*Engine, *testHooks) { peerID := p2ptypes.PeerID{} initFailed := make(chan struct{}) + initSuccessful := make(chan struct{}) executionFinished := make(chan string, 100) clock := clockwork.NewFakeClock() cfg := Config{ @@ -92,7 +95,9 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...fun maxRetries: 1, retryMs: 100, afterInit: func(success bool) { - if !success { + if success { + close(initSuccessful) + } else { close(initFailed) } }, @@ -107,7 +112,7 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...fun } eng, err := NewEngine(cfg) require.NoError(t, err) - return eng, &testHooks{initFailed: initFailed, executionFinished: executionFinished} + return eng, &testHooks{initSuccessful: initSuccessful, initFailed: initFailed, executionFinished: executionFinished} } // getExecutionId returns the execution id of the workflow that is @@ -651,3 +656,95 @@ func TestEngine_TimesOutOldExecutions(t *testing.T) { require.NoError(t, err) assert.Equal(t, store.StatusTimeout, gotEx.Status) } + +const ( + delayedWorkflow = ` +triggers: + - id: "mercury-trigger@1.0.0" + config: + feedlist: + - "0x1111111111111111111100000000000000000000000000000000000000000000" # ETHUSD + - "0x2222222222222222222200000000000000000000000000000000000000000000" # LINKUSD + - "0x3333333333333333333300000000000000000000000000000000000000000000" # BTCUSD + +consensus: + - id: "offchain_reporting@1.0.0" + ref: "evm_median" + inputs: + observations: + - "$(trigger.outputs)" + config: + aggregation_method: "data_feeds_2_0" + aggregation_config: + "0x1111111111111111111100000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x2222222222222222222200000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + "0x3333333333333333333300000000000000000000000000000000000000000000": + deviation: "0.001" + heartbeat: "30m" + encoder: "EVM" + encoder_config: + abi: "mercury_reports bytes[]" + +targets: + - id: "write_polygon-testnet-mumbai@1.0.0" + inputs: + report: "$(evm_median.outputs.report)" + config: + address: "0x3F3554832c636721F1fD1822Ccca0354576741Ef" + params: ["$(report)"] + abi: "receive(report bytes)" + deltaStage: 2s + schedule: allAtOnce +` +) + +func TestEngine_WrapsTargets(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensus())) + require.NoError(t, reg.Add(ctx, mockTarget())) + + clock := clockwork.NewFakeClock() + dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clock) + + eng, hooks := newTestEngine( + t, + reg, + delayedWorkflow, + func(c *Config) { + c.Store = dbstore + c.clock = clock + }, + ) + err := eng.Start(ctx) + require.NoError(t, err) + + <-hooks.initSuccessful + + err = eng.workflow.walkDo(workflows.KeywordTrigger, func(s *step) error { + if s.Ref == workflows.KeywordTrigger { + return nil + } + + info, err2 := s.capability.Info(ctx) + require.NoError(t, err2) + + if info.CapabilityType == capabilities.CapabilityTypeTarget { + assert.Equal(t, "*transmission.LocalTargetCapability", fmt.Sprintf("%T", s.capability)) + } else { + assert.NotEqual(t, "*transmission.LocalTargetCapability", fmt.Sprintf("%T", s.capability)) + } + + return nil + }) + require.NoError(t, err) +}