Skip to content

Commit

Permalink
cmd: Add apmtelemetrygen binary (#100)
Browse files Browse the repository at this point in the history
Adds a new `apmtelemetrygen` binary that allows sending a known amount
of data (in iterations) to the APM server. This is useful for testing
exact matches on data volume or checking for correctness in the number
of documents and their contents.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored May 30, 2024
1 parent 09430e1 commit 340fae1
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .go-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.20.7
1.22.3
3 changes: 3 additions & 0 deletions Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ COPY ./cmd/apmsoak/scenarios.yml /opt/apm-perf/scenarios.yml
# Copy files for apmbench
COPY --from=builder /opt/apm-perf/dist/apmbench /usr/bin/apmbench

# Copy files for apmtelemetrygen
COPY --from=builder /opt/apm-perf/dist/apmtelemetrygen /usr/bin/apmtelemetrygen

# Default to apmsoak, override to use apmbench
CMD [ "/usr/bin/apmsoak" ]
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ build:
mkdir -p $(DIST_DIR)
go build -ldflags "$(LDFLAGS)" -o $(DIST_DIR)/apmsoak cmd/apmsoak/*.go
go build -ldflags "$(LDFLAGS)" -o $(DIST_DIR)/apmbench cmd/apmbench/*.go
go build -ldflags "$(LDFLAGS)" -o $(DIST_DIR)/apmtelemetrygen cmd/apmtelemetrygen/*.go

.PHONY: test
test: go.mod
Expand Down
190 changes: 190 additions & 0 deletions cmd/apmtelemetrygen/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package main

import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"os"
"os/signal"
"runtime"
"sort"
"strings"
"syscall"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/elastic/apm-perf/internal/loadgen"
"github.com/elastic/apm-perf/internal/version"
)

const envVarPrefix = "ELASTIC_APM_"

func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

// Register root command in cobra
var rootCmd = &cobra.Command{
Use: "apmtelemetrygen",
TraverseChildren: true,
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
var err error
cmd.Flags().VisitAll(func(flag *pflag.Flag) {
optionName := strings.ToUpper(flag.Name)
optionName = strings.ReplaceAll(optionName, "-", "_")
envVar := envVarPrefix + optionName
if val, ok := os.LookupEnv(envVar); !flag.Changed && ok {
if flagErr := flag.Value.Set(val); flagErr != nil {
err = fmt.Errorf("invalid environment variable %s: %w", envVar, flagErr)
}
}
})
return err
},
}
rootCmd.AddCommand(newRunCmd())
rootCmd.AddCommand(&cobra.Command{
Use: "version",
Short: "Show current version info",
Run: func(cmd *cobra.Command, _ []string) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s %s", version.CommitSha(), version.BuildTime())
fmt.Fprintf(cmd.OutOrStdout(), "%s version %s (%s/%s) [%s]\n",
rootCmd.Name(), version.Version, runtime.GOOS, runtime.GOARCH,
buf.String(),
)
},
})

// Execute commands
if err := rootCmd.ExecuteContext(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
fmt.Println(err)
}
}
}

type headersFlag map[string]string

func (f headersFlag) String() string {
keys := make([]string, 0, len(f))
for k := range f {
keys = append(keys, k)
}
sort.Strings(keys)
for i, k := range keys {
keys[i] = fmt.Sprintf("%s=%s", k, f[k])
}
return strings.Join(keys, ",")
}

func (f headersFlag) Set(s string) error {
k, v, ok := strings.Cut(s, "=")
if !ok {
return fmt.Errorf("expected k=v, got %q", s)
}
f[k] = v
return nil
}

func (f headersFlag) Type() string { return "k=v" }

func newRunCmd() *cobra.Command {
options := &runOptions{Headers: make(map[string]string)}
cmd := cobra.Command{
Use: "run",
Short: "Runs the load generator for APM telemetry data",
RunE: func(cmd *cobra.Command, args []string) error {
logger := getLogger(options.Loglevel)
config, err := options.toEventHandlerParams(logger)
if err != nil {
logger.Fatal("Failed to parse flags", zap.Error(err))
}

lg, err := loadgen.NewEventHandler(config)
if err != nil {
logger.Fatal("Failed to create event handler", zap.Error(err))
}

for i := 0; i < options.Iterations; i++ {
if _, err := lg.SendBatches(cmd.Context()); err != nil {
if !options.IgnoreErrors {
return err
}
logger.Error("Failed to send batches", zap.Error(err))
}
}
return nil
},
}
cmd.Flags().Var(headersFlag(options.Headers), "header", "Extra headers to send. Can be specified multiple times")
cmd.Flags().StringVar(&options.ServerURL, "server-url", "", "Server URL (default http://127.0.0.1:8200)")
cmd.Flags().StringVar(&options.SecretToken, "secret-token", "", "Secret token for APM Server. Managed intake service doesn't support secret token")
cmd.Flags().StringVar(&options.APIKey, "api-key", "", "API key to use for authentication")
cmd.Flags().StringVar(&options.Loglevel, "log-level", "info", "Specify the log level to use when running this command. Supported values: debug, info, warn, error")
cmd.Flags().StringVar(&options.Protocol, "protocol", "apm/http", "Specify the protocol to use when sending events. Supported values: apm/http, otlp/http")
cmd.Flags().StringVar(&options.Datatype, "data-type", "any", "Specify the data type to use when sending events. Supported values: any, logs, metrics, traces")
cmd.Flags().StringVar(&options.EventRate, "event-rate", "0/s", "Must be in the format <number of events>/<time>. <time> is parsed")
cmd.Flags().IntVar(&options.Iterations, "iterations", 1, "The number of times to replay the canned data for")
cmd.Flags().BoolVar(&options.IgnoreErrors, "ignore-errors", false, "Ignore HTTP errors while sending events")
return &cmd
}

type runOptions struct {
Headers map[string]string
ServerURL string
SecretToken string
APIKey string
Loglevel string
Protocol string
Datatype string
EventRate string
Iterations int
IgnoreErrors bool
}

func (opts *runOptions) toEventHandlerParams(logger *zap.Logger) (loadgen.EventHandlerParams, error) {
burst, interval, err := loadgen.ParseEventRate(opts.EventRate)
if err != nil {
return loadgen.EventHandlerParams{}, err
}

return loadgen.EventHandlerParams{
Logger: logger,
Path: "apm*.ndjson",
URL: opts.ServerURL,
Token: opts.SecretToken,
APIKey: opts.APIKey,
Headers: opts.Headers,
IgnoreErrors: opts.IgnoreErrors,
Protocol: opts.Protocol,
Datatype: opts.Datatype,
Limiter: loadgen.GetNewLimiter(burst, interval),
Rand: rand.New(rand.NewSource(time.Now().UnixNano())),

RewriteIDs: true,
RewriteTimestamps: true,
}, nil
}

func getLogger(logLevel string) *zap.Logger {
level, err := zapcore.ParseLevel(logLevel)
if err != nil {
level = zap.InfoLevel
}

return zap.New(ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(), os.Stdout, level,
), zap.AddCaller())
}
32 changes: 32 additions & 0 deletions internal/loadgen/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
package loadgen

import (
"fmt"
"strconv"
"strings"
"time"

"golang.org/x/time/rate"
Expand All @@ -23,3 +26,32 @@ func GetNewLimiter(burst int, interval time.Duration) *rate.Limiter {
eps := float64(burst) / interval.Seconds()
return rate.NewLimiter(rate.Limit(eps), burst)
}

// ParseEventRate takes a string in the format of "burst/duration" and returns the burst,
// and the duration, and an error if any. Burst is the number of events and duration is
// the time period in which these events occur. If the string is not in the expected
// format, an error is returned.
func ParseEventRate(eventRate string) (int, time.Duration, error) {
before, after, ok := strings.Cut(eventRate, "/")
if !ok || before == "" || after == "" {
return 0, 0, fmt.Errorf("invalid rate %q, expected format burst/duration", eventRate)
}

burst, err := strconv.Atoi(before)
if err != nil {
return 0, 0, fmt.Errorf("invalid burst %s in event rate: %w", before, err)
}

if !(after[0] >= '0' && after[0] <= '9') {
after = "1" + after
}
interval, err := time.ParseDuration(after)
if err != nil {
return 0, 0, fmt.Errorf("invalid interval %q in event rate: %w", after, err)
}
if interval <= 0 {
return 0, 0, fmt.Errorf("invalid interval %q, must be positive", after)
}

return burst, interval, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package soaktest
package loadgen

import (
"testing"
"time"
)

func Test_getEventRate(t *testing.T) {
func TestParseEventRate(t *testing.T) {
tests := []struct {
name string
eventRate string
Expand Down Expand Up @@ -63,16 +63,16 @@ func Test_getEventRate(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1, err := getEventRate(tt.eventRate)
got, got1, err := ParseEventRate(tt.eventRate)
if (err != nil) != tt.wantErr {
t.Errorf("getEventRate() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("ParseEventRate() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.burst {
t.Errorf("getEventRate() got = %v, want %v", got, tt.burst)
t.Errorf("ParseEventRate() got = %v, want %v", got, tt.burst)
}
if got1 != tt.rate {
t.Errorf("getEventRate() got1 = %v, want %v", got1, tt.rate)
t.Errorf("ParseEventRate() got1 = %v, want %v", got1, tt.rate)
}
})
}
Expand Down
29 changes: 1 addition & 28 deletions internal/soaktest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -162,7 +160,7 @@ func getHandlerParams(runnerConfig *RunnerConfig, config ScenarioConfig) (loadge
config.APIKey = runnerConfig.APIKeys[config.ProjectID]
}

burst, interval, err := getEventRate(config.EventRate)
burst, interval, err := loadgen.ParseEventRate(config.EventRate)
if err != nil {
return params, err
}
Expand Down Expand Up @@ -207,28 +205,3 @@ func getHandlerParams(runnerConfig *RunnerConfig, config ScenarioConfig) (loadge

return params, nil
}

func getEventRate(eventRate string) (int, time.Duration, error) {
before, after, ok := strings.Cut(eventRate, "/")
if !ok || before == "" || after == "" {
return 0, 0, fmt.Errorf("invalid rate %q, expected format burst/duration", eventRate)
}

burst, err := strconv.Atoi(before)
if err != nil {
return 0, 0, fmt.Errorf("invalid burst %s in event rate: %w", before, err)
}

if !(after[0] >= '0' && after[0] <= '9') {
after = "1" + after
}
interval, err := time.ParseDuration(after)
if err != nil {
return 0, 0, fmt.Errorf("invalid interval %q in event rate: %w", after, err)
}
if interval <= 0 {
return 0, 0, fmt.Errorf("invalid interval %q, must be positive", after)
}

return burst, interval, nil
}

0 comments on commit 340fae1

Please sign in to comment.