Skip to content

Commit

Permalink
Separate otel collector from apmbench
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Oct 27, 2023
1 parent 3f6e5c0 commit e6a8da1
Show file tree
Hide file tree
Showing 20 changed files with 664 additions and 1,532 deletions.
3 changes: 2 additions & 1 deletion cmd/apmbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

APMBench allows running a set of benchmarks against APM supported endpoints. It
also allows collection of OTEL metrics from the servers implementing APM endpoints
and report it as benchmarking results.
and report it as benchmarking results. This is possible by using otel collector
with a custom [in-memory exporter](../otelinmemexporter).
8 changes: 3 additions & 5 deletions cmd/apmbench/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ var cfg struct {
Benchtime time.Duration
RunRE *regexp.Regexp
// Sorted list of agents count to be used for benchmarking
AgentsList []int
CollectorConfigYaml string
ServerMode bool
AgentsList []int
BenchmarkTelemetryEndpoint string
}

func init() {
Expand Down Expand Up @@ -58,6 +57,5 @@ func init() {
return nil
},
)
flag.StringVar(&cfg.CollectorConfigYaml, "collector-config-yaml", "", "configuration for otel collector")
flag.BoolVar(&cfg.ServerMode, "server-mode", false, "continue running otel collector post benchmark run")
flag.StringVar(&cfg.BenchmarkTelemetryEndpoint, "benchmark-telemetry-endpoint", "", "Telemetry endpoint that exposed benchmark telemetry data with reset capabilities")
}
75 changes: 12 additions & 63 deletions cmd/apmbench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,11 @@
package main

import (
"context"
"errors"
"flag"
"log"
"os"
"os/signal"
"sync"
"testing"
"time"

"go.uber.org/zap"

"github.com/elastic/apm-perf/internal/otelcollector"
)

func main() {
Expand All @@ -28,61 +20,26 @@ func main() {
log.Fatalf("failed to setup logger: %v", err)
}

// Create otel collector
collectorCfg := otelcollector.DefaultConfig()
if cfg.CollectorConfigYaml != "" {
err := collectorCfg.LoadConfigFromYamlFile(cfg.CollectorConfigYaml)
// Run benchmarks
telemetry := telemetry{endpoint: cfg.BenchmarkTelemetryEndpoint}
extraMetrics := func(b *testing.B) {
m, err := telemetry.GetAll()
if err != nil {
logger.Fatal("failed to load collector config", zap.Error(err))
logger.Warn("failed to retrive benchmark metrics", zap.Error(err))
return
}
}
collector, err := otelcollector.New(collectorCfg, logger)
if err != nil {
logger.Fatal("failed to create a new collector", zap.Error(err))
}
logger.Info("loaded collector configuration", zap.Object("config", &collectorCfg))

// Start otel collector
var wg sync.WaitGroup
defer wg.Wait()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()

logger.Info("starting otel collector...")
if err := collector.Run(ctx); err != nil {
logger.Fatal("failed to run collector", zap.Error(err))
for unit, val := range m {
b.ReportMetric(val, unit)
}
}(ctx)

// Wait for otel collector to be ready
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := collector.Wait(ctx); err != nil {
logger.Fatal("failed to start collector", zap.Error(err))
}

// Run benchmarks
extraMetrics := func(b *testing.B) error {
var errs []error
for _, cfg := range collectorCfg.InMemoryStoreConfig {
m, err := collector.GetAggregatedMetric(cfg)
if err != nil {
errs = append(errs, err)
continue
}
b.ReportMetric(m, cfg.Alias)
resetStoreFunc := func() {
if err := telemetry.Reset(); err != nil {
logger.Warn("failed to reset store, benchmark report may be corrupted", zap.Error(err))
}
if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}
if err := Run(
extraMetrics,
collector.Reset,
resetStoreFunc,
Benchmark1000Transactions,
BenchmarkOTLPTraces,
BenchmarkAgentAll,
Expand All @@ -95,14 +52,6 @@ func main() {
logger.Fatal("failed to run benchmarks", zap.Error(err))
}
logger.Info("finished running benchmarks")

// If server-mode is enabled then keep the otel collector running
if cfg.ServerMode {
logger.Info("continuing to serve OTEL collector endpoints")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
}
}

func init() {
Expand Down
9 changes: 4 additions & 5 deletions cmd/apmbench/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type result struct {

// Run runs all the given BenchmarkFunc.
func Run(
extraMetrics func(*testing.B) error,
extraMetrics func(*testing.B),
resetStore func(),
fns ...BenchmarkFunc,
) error {
Expand Down Expand Up @@ -77,7 +77,8 @@ func Run(
for _, b := range benchmarks {
name := fullBenchmarkName(b.name, agents)
for i := 0; i < int(cfg.Count); i++ {
result := runOne(extraMetrics, resetStore, b.fn)
resetStore() // reset the metric store before starting any benchmark
result := runOne(extraMetrics, b.fn)
// testing.Benchmark discards all output so the only thing we can
// retrive is the benchmark status and result.
if result.skipped {
Expand All @@ -103,11 +104,9 @@ func Run(
}

func runOne(
extraMetrics func(*testing.B) error,
resetStore func(),
extraMetrics func(*testing.B),
fn BenchmarkFunc,
) (result result) {
defer resetStore()
limiter := loadgen.GetNewLimiter(
loadgencfg.Config.EventRate.Burst,
loadgencfg.Config.EventRate.Interval,
Expand Down
48 changes: 48 additions & 0 deletions cmd/apmbench/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package main

import (
"encoding/json"
"errors"
"fmt"
"net/http"
)

type telemetry struct {
endpoint string
}

func (t telemetry) GetAll() (map[string]float64, error) {
resp, err := http.Get(t.endpoint + "/")
if err != nil {
return nil, fmt.Errorf("failed to get telemetry data: %w", err)
}
defer resp.Body.Close()

switch resp.StatusCode / 100 {
case 2:
m := make(map[string]float64)
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, fmt.Errorf("failed to decode response body for getting telemetry data: %w", err)
}
return m, nil
default:
return nil, fmt.Errorf("unsuccessful response from benchmark telemetry server: %d", resp.StatusCode)
}
}

func (t telemetry) Reset() error {
resp, err := http.Post(t.endpoint+"/reset", "application/json", nil)
if err != nil {
return errors.New("failed to reset telemetry")
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return errors.New("failed to reset telemetry")
}
return nil
}
31 changes: 31 additions & 0 deletions cmd/otelinmemexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# OTEL in-memory exporter

Aggregates collected metrics as per the configured aggregation type and exposes them via a HTTP endpoint. Current supported aggregation types are `last`, `sum`, and `rate`.

## Usage

The exporter is built to aid collection of metrics for benchmarking using [apmbench](../apmbench). It can be used to build OpenTelemetry collector using [OpenTelemetry Collector Builder (ocb)](https://pkg.go.dev/go.opentelemetry.io/collector/cmd/builder#section-readme). Example configurartion file for the builder:

```yaml
dist:
module: opentelemetry-collector
name: otel
description: Test otel collector with in-memory exporter.
output_path: ./generated
otelcol_version: 0.88.0

exporters:
- gomod: github.com/elastic/apm-perf/cmd/otelinmemexporter v0.0.0-00010101000000-000000000000

processors:
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.88.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.88.0

receivers:
- gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0

```

The above configuration file can be used to generate code for OpenTelemetry Collector using `builder --skip-compilation --config=ocb-config.yaml`.

NOTE: The in-memory exporter should be used for a single benchmark at a time to avoid conflicts in collected metrics.
35 changes: 35 additions & 0 deletions cmd/otelinmemexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// Package otelinmemexporter contains code for creating an in-memory OTEL exporter.
package otelinmemexporter

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
)

type serverConfig struct {
Endpoint string `mapstructure:"endpoint"`
}

type Config struct {
Aggregations []AggregationConfig `mapstructure:"aggregations"`
Server serverConfig `mapstructure:"server"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the exporter configuration is valid
func (cfg *Config) Validate() error {
if _, err := validateAggregationConfig(cfg.Aggregations); err != nil {
return fmt.Errorf("failed to validate aggregation config: %w", err)
}
if cfg.Server.Endpoint == "" {
return errors.New("failed to validate server config: address cannot be empty")
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// Package inmemexporter contains code for creating an in-memory OTEL exporter.
package inmemexporter
// Package otelinmemexporter contains code for creating an in-memory OTEL exporter.
package otelinmemexporter

import (
"context"
Expand All @@ -15,8 +15,6 @@ import (

const componentID = "inmem"

type Config struct{}

type inMemExporter struct {
cfg Config
store *Store
Expand Down
60 changes: 60 additions & 0 deletions cmd/otelinmemexporter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package otelinmemexporter

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func NewFactory() exporter.Factory {
return exporter.NewFactory(
componentID,
createDefaultConfig,
exporter.WithMetrics(
createMetricsExporter,
component.StabilityLevelDevelopment,
),
)
}

func createDefaultConfig() component.Config {
return &Config{
Server: serverConfig{Endpoint: ":8081"},
}
}

func createMetricsExporter(
ctx context.Context,
settings exporter.CreateSettings,
rawCfg component.Config,
) (exporter.Metrics, error) {
cfg := rawCfg.(*Config)
logger := settings.TelemetrySettings.Logger

// create in memory metrics store
store, err := NewStore(cfg.Aggregations, logger)
if err != nil {
return nil, fmt.Errorf("failed to create in-memory metrics store: %w", err)
}
// Start http server
newServer(store, cfg.Server.Endpoint, logger).Start()

exp := new(*cfg, store, logger)
return exporterhelper.NewMetricsExporter(
ctx, settings, cfg,
exp.consumeMetrics,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
)
}
Loading

0 comments on commit e6a8da1

Please sign in to comment.