Skip to content

Commit

Permalink
Fix lint
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Dec 19, 2024
1 parent 9120cd9 commit 504102e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 23 deletions.
18 changes: 11 additions & 7 deletions core/services/llo/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/big"
"sync"
"testing"
Expand Down Expand Up @@ -229,7 +230,11 @@ func BenchmarkObserve(b *testing.B) {
db := pgtest.NewSqlxDB(b)
bridgesORM := bridges.NewORM(db)

n := uint32(b.N)
if b.N > math.MaxInt32 {
b.Fatalf("N is too large: %d", b.N)
}

n := uint32(b.N) //nolint:gosec // G115 // overflow impossible

createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0)
createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0)
Expand All @@ -250,15 +255,14 @@ func BenchmarkObserve(b *testing.B) {

r := streams.NewRegistry(lggr, runner)
for i := uint32(0); i < n; i++ {
jobStreamID := streams.StreamID(i)

i := i
jb := job.Job{
ID: int32(i),
ID: int32(i), //nolint:gosec // G115 // overflow impossible
Name: null.StringFrom(fmt.Sprintf("job-%d", i)),
Type: job.Stream,
StreamID: &jobStreamID,
StreamID: &i,
PipelineSpec: &pipeline.Spec{
ID: int32(i * 100),
ID: int32(i * 100), //nolint:gosec // G115 // overflow impossible
DotDagSource: fmt.Sprintf(`
// Benchmark Price
result1 [type=memo value="900.0022"];
Expand All @@ -284,7 +288,7 @@ result3 -> result3_parse -> multiply3;
ds := newDataSource(lggr, r, NullTelemeter)
vals := make(map[llotypes.StreamID]llo.StreamValue)
for i := uint32(0); i < 4*n; i++ {
vals[llotypes.StreamID(i)] = nil
vals[i] = nil
}

b.ResetTimer()
Expand Down
35 changes: 19 additions & 16 deletions core/services/llo/observation_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand/v2"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -274,16 +275,19 @@ result3 -> result3_parse -> multiply3;
})
}

func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(t *testing.B) {
ctx := tests.Context(t)
lggr := logger.TestLogger(t)
db := pgtest.NewSqlxDB(t)
func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStressTest_manyStreams(b *testing.B) {
ctx := tests.Context(b)
lggr := logger.TestLogger(b)
db := pgtest.NewSqlxDB(b)
bridgesORM := bridges.NewORM(db)

n := uint32(t.N)
if b.N > math.MaxInt32 {
b.Fatalf("N is too large: %d", b.N)
}
n := uint32(b.N) //nolint:gosec // G115 // overflow impossible

createBridge(t, "foo-bridge", `123.456`, bridgesORM, 0)
createBridge(t, "bar-bridge", `"124.456"`, bridgesORM, 0)
createBridge(b, "foo-bridge", `123.456`, bridgesORM, 0)
createBridge(b, "bar-bridge", `"124.456"`, bridgesORM, 0)

c := clhttptest.NewTestLocalOnlyHTTPClient()
runner := pipeline.NewRunner(
Expand All @@ -302,15 +306,14 @@ func BenchmarkObservationContext_Observe_integrationRealPipeline_concurrencyStre
r := streams.NewRegistry(lggr, runner)

for i := uint32(0); i < n; i++ {
jobStreamID := streams.StreamID(i)

i := i
jb := job.Job{
ID: int32(i),
ID: int32(i), //nolint:gosec // G115 // overflow impossible
Name: null.StringFrom(fmt.Sprintf("job-%d", i)),
Type: job.Stream,
StreamID: &jobStreamID,
StreamID: &i,
PipelineSpec: &pipeline.Spec{
ID: int32(i * 100),
ID: int32(i * 100), //nolint:gosec // G115 // overflow impossible
DotDagSource: fmt.Sprintf(`
// Benchmark Price
result1 [type=memo value="900.0022"];
Expand All @@ -330,26 +333,26 @@ result3 -> result3_parse -> multiply3;
},
}
err := r.Register(jb, nil)
require.NoError(t, err)
require.NoError(b, err)
}

telem := &mockTelemeter{}
oc := newObservationContext(r, telem)
opts := llo.DSOpts(nil)

// concurrency stress test
t.ResetTimer()
b.ResetTimer()
g, ctx := errgroup.WithContext(ctx)
for i := uint32(0); i < n; i++ {
for _, strmID := range []uint32{i, i + n, i + 2*n, i + 3*n} {
g.Go(func() error {
// ignore errors, only care about races
oc.Observe(ctx, strmID, opts)
oc.Observe(ctx, strmID, opts) //nolint:errcheck // ignore error
return nil
})
}
}
if err := g.Wait(); err != nil {
t.Fatalf("Observation failed: %v", err)
b.Fatalf("Observation failed: %v", err)
}
}

0 comments on commit 504102e

Please sign in to comment.