Skip to content

Commit

Permalink
Add benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 18, 2024
1 parent 97fce69 commit b47f956
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 10 deletions.
102 changes: 93 additions & 9 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ import (
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"gopkg.in/guregu/null.v4"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-data-streams/llo"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/null"
clnull "github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/streams"
Expand All @@ -42,7 +45,7 @@ func makePipelineWithMultipleStreamResults(streamIDs []streams.StreamID, results
}
trrs := make([]pipeline.TaskRunResult, len(streamIDs))
for i, res := range results {
trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: null.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}}
trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: clnull.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}}
}
return &mockPipeline{
run: &pipeline.Run{},
Expand Down Expand Up @@ -154,7 +157,9 @@ func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration {
func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour }
func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 }
func (m *mockPipelineConfig) ReaperThreshold() time.Duration { return 0 }
func (m *mockPipelineConfig) VerboseLogging() bool { return true }

// func (m *mockPipelineConfig) VerboseLogging() bool { return true }
func (m *mockPipelineConfig) VerboseLogging() bool { return false }

type mockBridgeConfig struct{}

Expand All @@ -165,23 +170,23 @@ func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration {
return 0
}

func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCalls int) {
callcount := 0
func createBridge(t testing.TB, name string, val string, borm bridges.ORM, maxCalls int64) {
callcount := atomic.NewInt64(0)
bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {
callcount++
if callcount > maxCalls {
n := callcount.Inc()
if maxCalls > 0 && n > maxCalls {
panic("too many calls to bridge" + name)
}
_, herr := io.ReadAll(req.Body)
if herr != nil {
t.Fatal(herr)
panic(herr)
}

res.WriteHeader(http.StatusOK)
resp := fmt.Sprintf(`{"result": %s}`, val)
_, herr = res.Write([]byte(resp))
if herr != nil {
t.Fatal(herr)
panic(herr)
}
}))
t.Cleanup(bridge.Close)
Expand Down Expand Up @@ -338,3 +343,82 @@ result3 -> result3_parse -> multiply3;
t.Fatalf("Observation failed: %v", err)
}
}

func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)
db := pgtest.NewSqlxDB(t)
bridgesORM := bridges.NewORM(db)

n := uint32(t.N)

Check failure on line 353 in core/services/llo/observation_context_test.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion int -> uint32 (gosec)

createBridge(t, "foo-bridge", `123.456`, bridgesORM, 0)
createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 0)

c := clhttptest.NewTestLocalOnlyHTTPClient()
runner := pipeline.NewRunner(
nil,
bridgesORM,
&mockPipelineConfig{},
&mockBridgeConfig{},
nil,
nil,
nil,
lggr,
c,
c,
)

r := streams.NewRegistry(lggr, runner)

for i := uint32(0); i < n; i++ {
jobStreamID := streams.StreamID(i)

Check failure on line 375 in core/services/llo/observation_context_test.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)

jb := job.Job{
ID: int32(i),

Check failure on line 378 in core/services/llo/observation_context_test.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion uint32 -> int32 (gosec)
Name: null.StringFrom(fmt.Sprintf("job-%d", i)),
Type: job.Stream,
StreamID: &jobStreamID,
PipelineSpec: &pipeline.Spec{
ID: int32(i * 100),

Check failure on line 383 in core/services/llo/observation_context_test.go

View workflow job for this annotation

GitHub Actions / lint

G115: integer overflow conversion uint32 -> int32 (gosec)
DotDagSource: fmt.Sprintf(`
// Benchmark Price
result1 [type=memo value="900.0022"];
multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal
result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"];
result2_parse [type=jsonparse path="result" streamID=%d index=1];
result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"];
result3_parse [type=jsonparse path="result"];
multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal
result1 -> multiply2;
result2 -> result2_parse;
result3 -> result3_parse -> multiply3;
`, i+n, i+2*n, i+3*n),
},
}
err := r.Register(jb, nil)
require.NoError(t, err)
}

telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
opts := llo.DSOpts(nil)

// concurrency stress test
g, ctx := errgroup.WithContext(ctx)
for i := uint32(0); i < n; i++ {
for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} {
g.Go(func() error {
// ignore errors, only care about races
oc.Observe(ctx, strmID, opts)

Check failure on line 416 in core/services/llo/observation_context_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `oc.Observe` is not checked (errcheck)
return nil
})
}
}
if err := g.Wait(); err != nil {
t.Fatalf("Observation failed: %v", err)
}
}
4 changes: 3 additions & 1 deletion core/services/pipeline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,9 @@ func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) {

func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars) TaskRunResults {
l := r.lggr.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName)
l.Debug("Initiating tasks for pipeline run of spec")
if r.config.VerboseLogging() {
l.Debug("Initiating tasks for pipeline run of spec")
}

scheduler := newScheduler(pipeline, run, vars, l)
go scheduler.Run()
Expand Down
3 changes: 3 additions & 0 deletions core/services/streams/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R
return nil, errors.New("job has no pipeline spec")
}
spec := *jb.PipelineSpec
spec.JobID = jb.ID
spec.JobName = jb.Name.ValueOrZero()
spec.JobType = string(jb.Type)
if spec.Pipeline == nil {
pipeline, err := spec.ParsePipeline()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
Expand Down

0 comments on commit b47f956

Please sign in to comment.