Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate.Limiter to RunConfiguration #65

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions loadgen/generic_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"go.temporal.io/sdk/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type GenericExecutor struct {
Expand All @@ -25,6 +26,7 @@ type genericRun struct {
info ScenarioInfo
config RunConfiguration
logger *zap.SugaredLogger
limiter *rate.Limiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this repeated here when it's stored on config? In fact, it seems this field is read but never written - I'm not sure anything's actually happening yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 You're totally right. This is a leftover from my initial draft hardcoded implementation.

I've fixed it, now the construction like this works as expected:

loadgen.MustRegisterScenario(loadgen.Scenario{
	Description: "Each iteration executes a single workflow with a noop activity.",
	Executor: loadgen.KitchenSinkExecutor{
		DefaultConfiguration: loadgen.RunConfiguration{
			Limiter: rate.NewLimiter(rate.Every(time.Second * 5), 1),
		},
		TestInput: &kitchensink.TestInput{
			WorkflowInput: &kitchensink.WorkflowInput{
				InitialActions: []*kitchensink.ActionSet{
					kitchensink.NoOpSingleActivityActionSet(),
				},
			},
		},
	},
})

// Timer capturing E2E execution of each scenario run iteration.
executeTimer client.MetricsTimer
}
Expand Down Expand Up @@ -104,8 +106,16 @@ func (g *genericRun) Run(ctx context.Context) error {
currentlyRunning++
run := g.info.NewRun(i + 1)
go func() {
startTime := time.Now()
err := g.executor.Execute(ctx, run)
var runStartTime time.Time
err := func() error {
if g.limiter != nil {
if innerErr := g.limiter.Wait(ctx); innerErr != nil {
return innerErr
}
}
runStartTime = time.Now()
return g.executor.Execute(ctx, run)
}()
// Only log/wrap/send to channel if context is not done
if ctx.Err() == nil {
if err != nil {
Expand All @@ -116,7 +126,7 @@ func (g *genericRun) Run(ctx context.Context) error {
case <-ctx.Done():
case doneCh <- err:
// Record/log here, not if it was cut short by context complete
g.executeTimer.Record(time.Since(startTime))
g.executeTimer.Record(time.Since(runStartTime))
}
}
}()
Expand Down
13 changes: 11 additions & 2 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package loadgen
import (
"context"
"fmt"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/temporalio/omes/loadgen/kitchensink"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type Scenario struct {
Expand Down Expand Up @@ -110,6 +111,9 @@ func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int {
const DefaultIterations = 10
const DefaultMaxConcurrent = 10

// DefaultRateLimiter is unlimited
var DefaultRateLimiter = rate.NewLimiter(rate.Inf, 0)

Comment on lines +114 to +116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// DefaultRateLimiter is unlimited
var DefaultRateLimiter = rate.NewLimiter(rate.Inf, 0)

Seems a nil limiter is supported at runtime

type RunConfiguration struct {
// Number of iterations to run of this scenario (mutually exclusive with Duration).
Iterations int
Expand All @@ -119,6 +123,8 @@ type RunConfiguration struct {
// Maximum number of instances of the Execute method to run concurrently.
// Default is DefaultMaxConcurrent.
MaxConcurrent int
// Rate limiter to be Wait-ed before each iteration.
Limiter *rate.Limiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Limiter *rate.Limiter
Limiter interface { Wait(context.Context) error }

Quite a bit more flexible

}

func (r *RunConfiguration) ApplyDefaults() {
Expand All @@ -128,6 +134,9 @@ func (r *RunConfiguration) ApplyDefaults() {
if r.MaxConcurrent == 0 {
r.MaxConcurrent = DefaultMaxConcurrent
}
if r.Limiter == nil {
r.Limiter = DefaultRateLimiter
}
Comment on lines +137 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if r.Limiter == nil {
r.Limiter = DefaultRateLimiter
}

Seems a nil rate limiter is supported at runtime

}

// Run represents an individual scenario run (many may be in a single instance (of possibly many) of a scenario).
Expand Down