Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate.Limiter to RunConfiguration #65

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions loadgen/generic_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (g *GenericExecutor) newRun(info ScenarioInfo) (*genericRun, error) {
if run.config.MaxConcurrent == 0 {
run.config.MaxConcurrent = g.DefaultConfiguration.MaxConcurrent
}
if run.config.Limiter == nil {
run.config.Limiter = g.DefaultConfiguration.Limiter
}
Comment on lines +57 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if run.config.Limiter == nil {
run.config.Limiter = g.DefaultConfiguration.Limiter
}

Seems a nil limiter is supported at runtime

run.config.ApplyDefaults()
if run.config.Iterations > 0 && run.config.Duration > 0 {
return nil, fmt.Errorf("invalid scenario: iterations and duration are mutually exclusive")
Expand Down Expand Up @@ -104,8 +107,16 @@ func (g *genericRun) Run(ctx context.Context) error {
currentlyRunning++
run := g.info.NewRun(i + 1)
go func() {
startTime := time.Now()
err := g.executor.Execute(ctx, run)
var runStartTime time.Time
err := func() error {
if g.config.Limiter != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be worked into the part above this goroutine where we do waiting? Seeing max-concurrent, we have traditionally waited in the synchronous loop before instantiating the run.

if innerErr := g.config.Limiter.Wait(ctx); innerErr != nil {
return innerErr
}
}
runStartTime = time.Now()
return g.executor.Execute(ctx, run)
}()
Comment on lines +110 to +119
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var runStartTime time.Time
err := func() error {
if g.config.Limiter != nil {
if innerErr := g.config.Limiter.Wait(ctx); innerErr != nil {
return innerErr
}
}
runStartTime = time.Now()
return g.executor.Execute(ctx, run)
}()
runStartTime := time.Now()
var err error
if g.config.Limiter != nil {
err = g.config.Limiter.Wait(ctx)
}
if err == nil {
err = g.executor.Execute(ctx, run)
}

Doesn't seem to be much benefit in the inner function. I don't think we have to overly worry about err reuse.

// Only log/wrap/send to channel if context is not done
if ctx.Err() == nil {
if err != nil {
Expand All @@ -116,7 +127,7 @@ func (g *genericRun) Run(ctx context.Context) error {
case <-ctx.Done():
case doneCh <- err:
// Record/log here, not if it was cut short by context complete
g.executeTimer.Record(time.Since(startTime))
g.executeTimer.Record(time.Since(runStartTime))
}
}
}()
Expand Down
13 changes: 11 additions & 2 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package loadgen
import (
"context"
"fmt"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/temporalio/omes/loadgen/kitchensink"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/sdk/client"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

type Scenario struct {
Expand Down Expand Up @@ -110,6 +111,9 @@ func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int {
const DefaultIterations = 10
const DefaultMaxConcurrent = 10

// DefaultRateLimiter is unlimited
var DefaultRateLimiter = rate.NewLimiter(rate.Inf, 0)

Comment on lines +114 to +116
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// DefaultRateLimiter is unlimited
var DefaultRateLimiter = rate.NewLimiter(rate.Inf, 0)

Seems a nil limiter is supported at runtime

type RunConfiguration struct {
// Number of iterations to run of this scenario (mutually exclusive with Duration).
Iterations int
Expand All @@ -119,6 +123,8 @@ type RunConfiguration struct {
// Maximum number of instances of the Execute method to run concurrently.
// Default is DefaultMaxConcurrent.
MaxConcurrent int
// Rate limiter to be Wait-ed before each iteration.
Limiter *rate.Limiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Limiter *rate.Limiter
Limiter interface { Wait(context.Context) error }

Quite a bit more flexible

}

func (r *RunConfiguration) ApplyDefaults() {
Expand All @@ -128,6 +134,9 @@ func (r *RunConfiguration) ApplyDefaults() {
if r.MaxConcurrent == 0 {
r.MaxConcurrent = DefaultMaxConcurrent
}
if r.Limiter == nil {
r.Limiter = DefaultRateLimiter
}
Comment on lines +137 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if r.Limiter == nil {
r.Limiter = DefaultRateLimiter
}

Seems a nil rate limiter is supported at runtime

}

// Run represents an individual scenario run (many may be in a single instance (of possibly many) of a scenario).
Expand Down
Loading