From b47f956f4d65beeaf00a676b6faed2421303741a Mon Sep 17 00:00:00 2001 From: Sam Davies Date: Wed, 18 Dec 2024 10:05:14 -0500 Subject: [PATCH] Add benchmark --- core/services/llo/observation_context_test.go | 102 ++++++++++++++++-- core/services/pipeline/runner.go | 4 +- core/services/streams/pipeline.go | 3 + go.mod | 1 + 4 files changed, 100 insertions(+), 10 deletions(-) diff --git a/core/services/llo/observation_context_test.go b/core/services/llo/observation_context_test.go index b96383a5796..882661b79af 100644 --- a/core/services/llo/observation_context_test.go +++ b/core/services/llo/observation_context_test.go @@ -14,8 +14,11 @@ import ( "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "gopkg.in/guregu/null.v4" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-data-streams/llo" @@ -23,7 +26,7 @@ import ( clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/null" + clnull "github.com/smartcontractkit/chainlink/v2/core/null" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/streams" @@ -42,7 +45,7 @@ func makePipelineWithMultipleStreamResults(streamIDs []streams.StreamID, results } trrs := make([]pipeline.TaskRunResult, len(streamIDs)) for i, res := range results { - trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: null.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}} + trrs[i] = pipeline.TaskRunResult{Task: &pipeline.MemoTask{BaseTask: pipeline.BaseTask{StreamID: clnull.Uint32From(streamIDs[i])}}, Result: pipeline.Result{Value: res}} } return &mockPipeline{ run: &pipeline.Run{}, @@ -154,7 +157,9 @@ func (m *mockPipelineConfig) DefaultHTTPTimeout() commonconfig.Duration { func (m *mockPipelineConfig) MaxRunDuration() time.Duration { return 1 * time.Hour } func (m *mockPipelineConfig) ReaperInterval() time.Duration { return 0 } func (m *mockPipelineConfig) ReaperThreshold() time.Duration { return 0 } -func (m *mockPipelineConfig) VerboseLogging() bool { return true } + +// func (m *mockPipelineConfig) VerboseLogging() bool { return true } +func (m *mockPipelineConfig) VerboseLogging() bool { return false } type mockBridgeConfig struct{} @@ -165,23 +170,23 @@ func (m *mockBridgeConfig) BridgeCacheTTL() time.Duration { return 0 } -func createBridge(t *testing.T, name string, val string, borm bridges.ORM, maxCalls int) { - callcount := 0 +func createBridge(t testing.TB, name string, val string, borm bridges.ORM, maxCalls int64) { + callcount := atomic.NewInt64(0) bridge := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - callcount++ - if callcount > maxCalls { + n := callcount.Inc() + if maxCalls > 0 && n > maxCalls { panic("too many calls to bridge" + name) } _, herr := io.ReadAll(req.Body) if herr != nil { - t.Fatal(herr) + panic(herr) } res.WriteHeader(http.StatusOK) resp := fmt.Sprintf(`{"result": %s}`, val) _, herr = res.Write([]byte(resp)) if herr != nil { - t.Fatal(herr) + panic(herr) } })) t.Cleanup(bridge.Close) @@ -338,3 +343,82 @@ result3 -> result3_parse -> multiply3; t.Fatalf("Observation failed: %v", err) } } + +func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + bridgesORM := bridges.NewORM(db) + + n := uint32(t.N) + + createBridge(t, "foo-bridge", `123.456`, bridgesORM, 0) + createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 0) + + c := clhttptest.NewTestLocalOnlyHTTPClient() + runner := pipeline.NewRunner( + nil, + bridgesORM, + &mockPipelineConfig{}, + &mockBridgeConfig{}, + nil, + nil, + nil, + lggr, + c, + c, + ) + + r := streams.NewRegistry(lggr, runner) + + for i := uint32(0); i < n; i++ { + jobStreamID := streams.StreamID(i) + + jb := job.Job{ + ID: int32(i), + Name: null.StringFrom(fmt.Sprintf("job-%d", i)), + Type: job.Stream, + StreamID: &jobStreamID, + PipelineSpec: &pipeline.Spec{ + ID: int32(i * 100), + DotDagSource: fmt.Sprintf(` +// Benchmark Price +result1 [type=memo value="900.0022"]; +multiply2 [type=multiply times=1 streamID=%d index=0]; // force conversion to decimal + +result2 [type=bridge name="foo-bridge" requestData="{\"data\":{\"data\":\"foo\"}}"]; +result2_parse [type=jsonparse path="result" streamID=%d index=1]; + +result3 [type=bridge name="bar-bridge" requestData="{\"data\":{\"data\":\"bar\"}}"]; +result3_parse [type=jsonparse path="result"]; +multiply3 [type=multiply times=1 streamID=%d index=2]; // force conversion to decimal + +result1 -> multiply2; +result2 -> result2_parse; +result3 -> result3_parse -> multiply3; +`, i+n, i+2*n, i+3*n), + }, + } + err := r.Register(jb, nil) + require.NoError(t, err) + } + + telem := &mockTelemeter{} + oc := newObservationContext(r, telem) + opts := llo.DSOpts(nil) + + // concurrency stress test + g, ctx := errgroup.WithContext(ctx) + for i := uint32(0); i < n; i++ { + for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} { + g.Go(func() error { + // ignore errors, only care about races + oc.Observe(ctx, strmID, opts) + return nil + }) + } + } + if err := g.Wait(); err != nil { + t.Fatalf("Observation failed: %v", err) + } +} diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index 2194cb8be46..30c7842914d 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -377,7 +377,9 @@ func (r *runner) InitializePipeline(spec Spec) (pipeline *Pipeline, err error) { func (r *runner) run(ctx context.Context, pipeline *Pipeline, run *Run, vars Vars) TaskRunResults { l := r.lggr.With("run.ID", run.ID, "executionID", uuid.New(), "specID", run.PipelineSpecID, "jobID", run.PipelineSpec.JobID, "jobName", run.PipelineSpec.JobName) - l.Debug("Initiating tasks for pipeline run of spec") + if r.config.VerboseLogging() { + l.Debug("Initiating tasks for pipeline run of spec") + } scheduler := newScheduler(pipeline, run, vars, l) go scheduler.Run() diff --git a/core/services/streams/pipeline.go b/core/services/streams/pipeline.go index 9fa6c25d36d..de52fed26e5 100644 --- a/core/services/streams/pipeline.go +++ b/core/services/streams/pipeline.go @@ -44,6 +44,9 @@ func newMultiStreamPipeline(lggr logger.Logger, jb job.Job, runner Runner, rrs R return nil, errors.New("job has no pipeline spec") } spec := *jb.PipelineSpec + spec.JobID = jb.ID + spec.JobName = jb.Name.ValueOrZero() + spec.JobType = string(jb.Type) if spec.Pipeline == nil { pipeline, err := spec.ParsePipeline() if err != nil { diff --git a/go.mod b/go.mod index 1767da64153..94621156009 100644 --- a/go.mod +++ b/go.mod @@ -108,6 +108,7 @@ require ( go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 + go.uber.org/atomic v1.11.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.31.0