Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 16, 2024
1 parent ce12d6a commit 4bf0bf7
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 39 deletions.
1 change: 1 addition & 0 deletions core/services/relay/evm/mercury/mocks/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ func (m *MockTask) TaskTimeout() (time.Duration, bool) { return 0, false }
func (m *MockTask) TaskRetries() uint32 { return 0 }
func (m *MockTask) TaskMinBackoff() time.Duration { return 0 }
func (m *MockTask) TaskMaxBackoff() time.Duration { return 0 }
func (m *MockTask) TaskStreamID() *int32 { return nil }
9 changes: 4 additions & 5 deletions core/services/streams/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (

type mockRegistry struct{}

func (m *mockRegistry) Get(streamID StreamID) (strm Stream, exists bool) { return }
func (m *mockRegistry) Register(streamID StreamID, spec pipeline.Spec, rrs ResultRunSaver) error {
func (m *mockRegistry) Get(streamID StreamID) (p Pipeline, exists bool) { return }
func (m *mockRegistry) Register(jb job.Job, rrs ResultRunSaver) error {
return nil
}
func (m *mockRegistry) Unregister(streamID StreamID) {}
func (m *mockRegistry) Unregister(int32) {}

type mockDelegateConfig struct{}

Expand Down Expand Up @@ -49,8 +49,7 @@ func Test_Delegate(t *testing.T) {

strmSrv := srvs[1].(*StreamService)
assert.Equal(t, registry, strmSrv.registry)
assert.Equal(t, StreamID(42), strmSrv.id)
assert.Equal(t, jb.PipelineSpec, strmSrv.spec)
assert.Equal(t, jb, strmSrv.jb)
assert.NotNil(t, strmSrv.lggr)
assert.Equal(t, srvs[0], strmSrv.rrs)
})
Expand Down
10 changes: 7 additions & 3 deletions core/services/streams/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package streams
import (
"context"
"fmt"
"sync"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
Expand All @@ -25,7 +24,6 @@ type Pipeline interface {
}

type multiStreamPipeline struct {
sync.RWMutex
lggr logger.Logger
spec pipeline.Spec
runner Runner
Expand Down Expand Up @@ -76,7 +74,13 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
},
})

return &multiStreamPipeline{sync.RWMutex{}, lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType), spec, runner, rrs, streamIDs, vars}, nil
return &multiStreamPipeline{
lggr.Named("MultiStreamPipeline").With("spec.ID", spec.ID, "jobID", spec.JobID, "jobName", spec.JobName, "jobType", spec.JobType),
spec,
runner,
rrs,
streamIDs,
vars}, nil
}

func (s *multiStreamPipeline) Run(ctx context.Context) (run *pipeline.Run, trrs pipeline.TaskRunResults, err error) {
Expand Down
2 changes: 0 additions & 2 deletions core/services/streams/stream_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
)

// TODO: Rename, this is actually a PipelineRegistry (? is it ?)

// alias for easier refactoring
type StreamID = llo.StreamID

Expand Down
79 changes: 50 additions & 29 deletions core/services/streams/stream_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,51 @@ import (
"testing"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockStream struct {
var _ Pipeline = &mockPipeline{}

type mockPipeline struct {
run *pipeline.Run
trrs pipeline.TaskRunResults
err error
}

func (m *mockStream) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
func (m *mockPipeline) Run(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
return m.run, m.trrs, m.err
}

func (m *mockPipeline) StreamIDs() []StreamID {
return nil
}

func Test_Registry(t *testing.T) {
lggr := logger.TestLogger(t)
runner := &mockRunner{}

t.Run("Get", func(t *testing.T) {
sr := newRegistry(lggr, runner)

sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}}
sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}}
sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}}
sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}}

v, exists := sr.Get(1)
assert.True(t, exists)
assert.Equal(t, sr.streams[1], v)
assert.Equal(t, sr.pipelines[1], v)

v, exists = sr.Get(2)
assert.True(t, exists)
assert.Equal(t, sr.streams[2], v)
assert.Equal(t, sr.pipelines[2], v)

v, exists = sr.Get(3)
assert.True(t, exists)
assert.Equal(t, sr.streams[3], v)
assert.Equal(t, sr.pipelines[3], v)

v, exists = sr.Get(4)
assert.Nil(t, v)
Expand All @@ -51,56 +58,70 @@ func Test_Registry(t *testing.T) {
t.Run("Register", func(t *testing.T) {
sr := newRegistry(lggr, runner)

t.Run("registers new stream", func(t *testing.T) {
assert.Len(t, sr.streams, 0)
err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
t.Run("registers new pipeline with multiple stream IDs", func(t *testing.T) {
assert.Len(t, sr.pipelines, 0)
// err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: "source"}}, nil)
// TODO: what if the dag is unparseable?
// err := sr.Register(1, pipeline.Spec{ID: 32, DotDagSource: "source"}, nil)
err := sr.Register(job.Job{PipelineSpec: &pipeline.Spec{ID: 32, DotDagSource: `
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=1 index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=2 index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=3 index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`}}, nil)
require.NoError(t, err)
assert.Len(t, sr.streams, 1)
assert.Len(t, sr.pipelines, 1)

v, exists := sr.Get(1)
require.True(t, exists)
strm := v.(*stream)
assert.Equal(t, StreamID(1), strm.id)
assert.Equal(t, int32(32), strm.spec.ID)
msp := v.(*multiStreamPipeline)
assert.Equal(t, "foo", msp.StreamIDs())
assert.Equal(t, int32(32), msp.spec.ID)
})

t.Run("errors when attempt to re-register a stream with an existing ID", func(t *testing.T) {
assert.Len(t, sr.streams, 1)
assert.Len(t, sr.pipelines, 1)
err := sr.Register(1, pipeline.Spec{ID: 33, DotDagSource: "source"}, nil)

Check failure on line 90 in core/services/streams/stream_registry_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

too many arguments in call to sr.Register

Check failure on line 90 in core/services/streams/stream_registry_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

too many arguments in call to sr.Register
require.Error(t, err)
assert.Len(t, sr.streams, 1)
assert.Len(t, sr.pipelines, 1)
assert.EqualError(t, err, "stream already registered for id: 1")

v, exists := sr.Get(1)
require.True(t, exists)
strm := v.(*stream)
assert.Equal(t, StreamID(1), strm.id)
assert.Equal(t, int32(32), strm.spec.ID)
msp := v.(*multiStreamPipeline)
assert.Equal(t, StreamID(1), msp.id)

Check failure on line 98 in core/services/streams/stream_registry_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

msp.id undefined (type *multiStreamPipeline has no field or method id)

Check failure on line 98 in core/services/streams/stream_registry_test.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

msp.id undefined (type *multiStreamPipeline has no field or method id)
assert.Equal(t, int32(32), msp.spec.ID)
})
})
t.Run("Unregister", func(t *testing.T) {
sr := newRegistry(lggr, runner)

sr.streams[1] = &mockStream{run: &pipeline.Run{ID: 1}}
sr.streams[2] = &mockStream{run: &pipeline.Run{ID: 2}}
sr.streams[3] = &mockStream{run: &pipeline.Run{ID: 3}}
sr.pipelines[1] = &mockPipeline{run: &pipeline.Run{ID: 1}}
sr.pipelines[2] = &mockPipeline{run: &pipeline.Run{ID: 2}}
sr.pipelines[3] = &mockPipeline{run: &pipeline.Run{ID: 3}}

t.Run("unregisters a stream", func(t *testing.T) {
assert.Len(t, sr.streams, 3)
assert.Len(t, sr.pipelines, 3)

sr.Unregister(1)

assert.Len(t, sr.streams, 2)
_, exists := sr.streams[1]
assert.Len(t, sr.pipelines, 2)
_, exists := sr.pipelines[1]
assert.False(t, exists)
})
t.Run("no effect when unregistering a non-existent stream", func(t *testing.T) {
assert.Len(t, sr.streams, 2)
assert.Len(t, sr.pipelines, 2)

sr.Unregister(1)

assert.Len(t, sr.streams, 2)
_, exists := sr.streams[1]
assert.Len(t, sr.pipelines, 2)
_, exists := sr.pipelines[1]
assert.False(t, exists)
})
})
Expand Down

0 comments on commit 4bf0bf7

Please sign in to comment.