Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add telemetrygen public package #197

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/telemetrygen/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package telemetrygen

import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
)

func DefaultConfig() Config {
return Config{
// default to run 1 replica of 4 agents (one per type).
AgentReplicas: 1,
// default to expecting a valid TLS certificate.
Secure: true,
// default to 10 events per second
EventRate: RateFlag{Burst: 10, Interval: 1 * time.Second},
// default to rewrite ids and timestamp to have new events recorded at the current time.
RewriteIDs: true,
RewriteTimestamps: true,
}
}

type Config struct {
// number of agents replicas to use, each replica launches 4 agents, one for each type
AgentReplicas int

ServerURL *url.URL
APIKey string
Headers map[string]string
Secure bool
EventRate RateFlag
IgnoreErrors bool

RewriteIDs bool
RewriteTimestamps bool
RewriteServiceNames bool
RewriteServiceNodeNames bool
RewriteServiceTargetNames bool
RewriteSpanNames bool
RewriteTransactionNames bool
RewriteTransactionTypes bool
}

func (c Config) Validate() error {
errs := []error{}

if c.ServerURL == nil {
errs = append(errs, fmt.Errorf("ServerURL is required"))
}

return errors.Join(errs...)
}

type RateFlag struct {
Burst int
Interval time.Duration
}

func (f *RateFlag) String() string {
return fmt.Sprintf("%d/%s", f.Burst, f.Interval)
}

func (f *RateFlag) Set(s string) error {
before, after, ok := strings.Cut(s, "/")
if !ok || before == "" || after == "" {
return fmt.Errorf("invalid rate %q, expected format burst/duration", s)
}

burst, err := strconv.Atoi(before)
if err != nil {
return fmt.Errorf("invalid burst %s in event rate: %w", before, err)
}

if !(after[0] >= '0' && after[0] <= '9') {
after = "1" + after
}
interval, err := time.ParseDuration(after)
if err != nil {
return fmt.Errorf("invalid interval %q in event rate: %w", after, err)
}
if interval <= 0 {
return fmt.Errorf("invalid interval %q, must be positive", after)
}

f.Burst = burst
f.Interval = interval
return nil
}
10 changes: 10 additions & 0 deletions pkg/telemetrygen/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// Package telemetrygen provides an easy way to generate
// dummy APM events from a corpus of canned events.
// Is a public package that is expected to be used outside
// of this module, as such backward compatibility rules
// apply.
package telemetrygen
86 changes: 86 additions & 0 deletions pkg/telemetrygen/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package telemetrygen

import (
"context"
cryptorand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"

"go.uber.org/zap"

"github.com/elastic/apm-perf/internal/loadgen"

"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
)

type Generator struct {
Config Config
Logger *zap.Logger
}

func New(cfg Config) (*Generator, error) {
if err := cfg.Validate(); err != nil {
return &Generator{}, fmt.Errorf("cannot create generator, configuration is invalid: %w", err)
}

return &Generator{Config: cfg, Logger: zap.NewNop()}, nil
}

func (g *Generator) RunBlocking(ctx context.Context) error {
limiter := loadgen.GetNewLimiter(g.Config.EventRate.Burst, g.Config.EventRate.Interval)
errg, gCtx := errgroup.WithContext(ctx)

var rngseed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &rngseed)
if err != nil {
return fmt.Errorf("failed to generate seed for math/rand: %w", err)
}
Comment on lines +40 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: in recent go versions the math/rand is already seeded with a random number, can we remove this and maybe use rand/v2 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted this code from APM Server systemtest/apmsoak. There is a comment there about this seed:

Create a Rand with the same seed for each agent, so we randomise their IDs consistently.

My understanding is that we want to use the same seed across all agents, so we could move to rand/v2 but we would still want to init a shared seed for all the agents. What do you think? I'm not 100% sure my understanding is still valid as that apmsoak package was quite old.


for i := 0; i < g.Config.AgentReplicas; i++ {
for _, expr := range []string{`apm-go*.ndjson`, `apm-nodejs*.ndjson`, `apm-python*.ndjson`, `apm-ruby*.ndjson`} {
expr := expr
errg.Go(func() error {
rng := rand.New(rand.NewSource(rngseed))
return runAgent(gCtx, g.Logger, expr, limiter, rng, g.Config)
})
}
}

return errg.Wait()
}

func runAgent(ctx context.Context, l *zap.Logger, expr string, limiter *rate.Limiter, rng *rand.Rand, cfg Config) error {
handler, err := loadgen.NewEventHandler(loadgen.EventHandlerParams{
Logger: l,
URL: cfg.ServerURL.String(),
Path: expr,
APIKey: cfg.APIKey,
Limiter: limiter,
Rand: rng,
RewriteIDs: cfg.RewriteIDs,
RewriteServiceNames: cfg.RewriteServiceNames,
RewriteServiceNodeNames: cfg.RewriteServiceNodeNames,
RewriteServiceTargetNames: cfg.RewriteServiceTargetNames,
RewriteSpanNames: cfg.RewriteSpanNames,
RewriteTransactionNames: cfg.RewriteTransactionNames,
RewriteTransactionTypes: cfg.RewriteTransactionTypes,
RewriteTimestamps: cfg.RewriteTimestamps,
Headers: cfg.Headers,
Protocol: "apm/http",
})
if err != nil {
return err
}

if _, err := handler.SendBatches(ctx); err != nil {
return err
}

return nil
}
55 changes: 55 additions & 0 deletions pkg/telemetrygen/generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package telemetrygen_test

import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/apm-perf/pkg/telemetrygen"
)

func TestGeneration(t *testing.T) {
srv, m := newTestServer(t)

cfg := telemetrygen.DefaultConfig()
cfg.Secure = false

u, err := url.Parse(srv.URL)
require.NoError(t, err)
cfg.ServerURL = u

cfg.EventRate.Set("1000/s")
g, err := telemetrygen.New(cfg)
// g.Logger = zap.Must(zap.NewDevelopment())
require.NoError(t, err)

err = g.RunBlocking(context.Background())
require.NoError(t, err)
require.Greater(t, m.Load(), int32(0))
}

func newTestServer(t *testing.T) (*httptest.Server, *atomic.Int32) {
t.Helper()

mux := http.NewServeMux()
requestsReceived := &atomic.Int32{}

mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
requestsReceived.Add(1)
w.Write([]byte("ok"))
})

srv := httptest.NewServer(mux)
t.Cleanup(srv.Close)

return srv, requestsReceived
}
Loading