Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Moxy tool to apm-perf #158

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions cmd/moxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package main

import (
"flag"
"fmt"
"net/http"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/elastic/apm-perf/internal/proxy"
)

func main() {
logLevel := zap.LevelFlag(
"loglevel", zapcore.InfoLevel,
"set log level to one of: DEBUG, INFO (default), WARN, ERROR, DPANIC, PANIC, FATAL",
)
username := flag.String("username", "elastic", "authentication username to mimic ES")
password := flag.String("password", "", "authentication username to mimic ES")
port := flag.Int("port", 9200, "http port to listen on")
flag.Parse()
zapcfg := zap.NewProductionConfig()
zapcfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
zapcfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
zapcfg.Encoding = "console"
zapcfg.Level = zap.NewAtomicLevelAt(*logLevel)
logger, err := zapcfg.Build()
if err != nil {
panic(err)
}
defer logger.Sync()
options := []proxy.StubESOption{
proxy.StubESWithLogger(logger),
}
if *username != "" && *password != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a mock. I'd set username and password to a default. Then maybe add an explicit check to ensure both are set. It makes no sense to continue of either are empty. Unless, you want to have auth optional which seems to be the case here. But this weird to me, since it is a mock:

if *username == "" {
  // fatal error
}
if *password == "" {
  // fatal error
}
options = append(options, proxy.StubESWithAuth(*username, *password))

If you want auth to be optional, maybe add a comment why? Maybe for performance testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auth there is just for an extra check that nobody sends unsolicited traffic when it's exposed publicly, otherwise it's optional.

options = append(options, proxy.StubESWithAuth(*username, *password))
}
s := http.Server{
Addr: fmt.Sprintf(":%d", *port),
Handler: proxy.NewHandlerStubES(options...),
}
if err := s.ListenAndServe(); err != nil {
logger.Fatal("listen error", zap.Error(err))
}
}
182 changes: 182 additions & 0 deletions internal/proxy/stub_es.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// Package proxy contains the code to build http proxy for apm.
package proxy

import (
"bufio"
"bytes"
"encoding/base64"
"fmt"
"io"
"net/http"
"strings"
"sync"

"github.com/klauspost/compress/gzip"
"github.com/klauspost/compress/zstd"
"go.uber.org/zap"
)

const (
defaultLicense = `{"license":{"uid":"cc49813b-6b8e-2138-fbb8-243ae2b3deed","type":"enterprise","status":"active"}}`
defaultInfo = `{
"name": "instance-0000000001",
"cluster_name": "eca3b3c3bbee4816bb92f82184e328dd",
"cluster_uuid": "cc49813b-6b8e-2138-fbb8-243ae2b3deed",
"version": {
"number": "8.15.1",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "253e8544a65ad44581194068936f2a5d57c2c051",
"build_date": "2024-09-02T22:04:47.310170297Z",
"build_snapshot": false,
"lucene_version": "9.11.1",
"minimum_wire_compatibility_version": "7.17.0",
"minimum_index_compatibility_version": "7.0.0"
},
"tagline": "You Know, for Search"
}`
)

var memPool = sync.Pool{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting use of sync pool with buffering. Maybe add a comment or a link to this trick?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm of the opinion to use []byte rather than bytes.Buffer. Either use sync.Pool or bytes.Buffer. Don't merge them:

  1. sync.Pool is already an abstraction. Use the lowest-level "primitive" which is []byte, rather than dealing with a higher-level abstraction like bytes.Buffer.
  2. With []byte you can simple reset the slice length. bytes.Buffer has more internal state to manage.
  3. bytes.Buffer is more efficient to stream data, like appending, since it avoids frequent reallocations. However, in pooling you'd rather want a static "container" that move in-and-out of the pool.
  4. You don't have need to use bytes.Buffer API and all the abstraction overhead that comes with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed separately bytes.Buffer and sync.Pool can compliment each other when reducing GC footprint for frequent allocations where the intermediate memory buffer size is unknown.

New: func() interface{} {
return new(bytes.Buffer)
},
}

type stubES struct {
logger *zap.Logger
auth string
info, license []byte
unknownPathCallback http.HandlerFunc
}

func NewHandlerStubES(options ...StubESOption) http.Handler {
h := new(stubES)
options = append([]StubESOption{
StubESWithLogger(zap.NewNop()),
StubESWithInfo(defaultInfo),
StubESWithLicense(defaultLicense),
StubESWithUnknownPathCallback(func(w http.ResponseWriter, req *http.Request) {
h.logger.Error("unknown path", zap.String("path", req.URL.Path))
}),
}, options...)
for _, opt := range options {
opt(h)
}
return h
}

func (h stubES) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("X-Elastic-Product", "Elasticsearch")
auth, _ := strings.CutPrefix(req.Header.Get("Authorization"), "Basic ")
if len(h.auth) > 0 && string(auth) != h.auth {
h.logger.Error(
"authentication failed",
zap.String("actual", auth),
zap.String("expected", h.auth),
)
w.WriteHeader(http.StatusUnauthorized)
return
}
switch req.URL.Path {
case "/":
_, _ = w.Write(h.info)
return
case "/_license":
_, _ = w.Write(h.license)
return
case "/_bulk":
first := true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move first := true to line 123.

var body io.Reader
switch req.Header.Get("Content-Encoding") {
case "gzip":
r, err := gzip.NewReader(req.Body)
if err != nil {
h.logger.Error("gzip reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
case "zstd":
r, err := zstd.NewReader(req.Body)
if err != nil {
h.logger.Error("zstd reader err", zap.Error(err))
http.Error(w, fmt.Sprintf("reader error: %v", err), http.StatusInternalServerError)
return
}
defer r.Close()
body = r
default:
body = req.Body
}

jsonw := memPool.Get().(*bytes.Buffer)
defer func() {
jsonw.Reset()
memPool.Put(jsonw)
}()

_, _ = jsonw.Write([]byte(`{"items":[`))
scanner := bufio.NewScanner(body)
for scanner.Scan() {
// Action is always "create", skip decoding.
if !scanner.Scan() {
h.logger.Error("unexpected payload")
http.Error(w, "expected source", http.StatusBadRequest)
return
}
if first {
first = false
} else {
_ = jsonw.WriteByte(',')
}
jsonw.Write([]byte(`{"create":{"status":201}}`))
}
if err := scanner.Err(); err != nil {
h.logger.Error("scanner error", zap.Error(err))
http.Error(w, fmt.Sprintf("scanner error: %v", err), http.StatusBadRequest)
} else {
jsonw.Write([]byte(`]}`))
_, _ = w.Write(jsonw.Bytes())
}
default:
h.unknownPathCallback(w, req)
}
}

type StubESOption func(*stubES)

func StubESWithLogger(logger *zap.Logger) StubESOption {
return func(h *stubES) {
h.logger = logger
}
}

func StubESWithAuth(username, password string) StubESOption {
return func(h *stubES) {
h.auth = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", username, password)))
}
}

func StubESWithInfo(info string) StubESOption {
return func(h *stubES) {
h.info = []byte(info)
}
}

func StubESWithLicense(license string) StubESOption {
return func(h *stubES) {
h.license = []byte(license)
}
}

func StubESWithUnknownPathCallback(callback http.HandlerFunc) StubESOption {
return func(h *stubES) {
h.unknownPathCallback = callback
}
}
160 changes: 160 additions & 0 deletions internal/proxy/stub_es_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package proxy

import (
"bytes"
"compress/gzip"
"encoding/base64"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/klauspost/compress/zstd"
"github.com/stretchr/testify/assert"
)

func TestHandlerStubES(t *testing.T) {
tests := map[string]struct {
method string
path string
payload io.Reader
headers http.Header
options []StubESOption
expectedCode int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename this to expectedStatusCode

expectedBody string
}{
"path /": {
method: "GET",
path: "/",
expectedCode: http.StatusOK,
expectedBody: defaultInfo,
},
"path /_license": {
method: "GET",
path: "/_license",
expectedCode: http.StatusOK,
expectedBody: defaultLicense,
},
"path /_bulk fail auth": {
method: "POST",
path: "/_bulk",
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedCode: http.StatusUnauthorized,
},
"path /_bulk empty payload": {
method: "POST",
path: "/_bulk",
headers: http.Header{
"Authorization": []string{
"Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")),
},
},
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedBody: `{"items":[]}`,
expectedCode: http.StatusOK,
},
"path /_bulk invalid payload": {
method: "POST",
path: "/_bulk",
payload: strings.NewReader(`
....
....
....
`),
headers: http.Header{
"Authorization": []string{
"Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")),
},
},
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedBody: "expected source\n",
expectedCode: http.StatusBadRequest,
},
"path /_bulk valid payload": {
method: "POST",
path: "/_bulk",
payload: strings.NewReader(`
{ "create" : { "_index" : "test", "_id" : "1" } }
{ "create" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }

`),
headers: http.Header{
"Authorization": []string{
"Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")),
},
},
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedBody: `{"items":[{"create":{"status":201}},{"create":{"status":201}},{"create":{"status":201}}]}`,
expectedCode: http.StatusOK,
},
"path /_bulk valid gzip payload": {
method: "POST",
path: "/_bulk",
payload: compress(strings.NewReader(`
{ "create" : { "_index" : "test", "_id" : "1" } }
{ "create" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }

`), func(w io.Writer) io.WriteCloser {
return gzip.NewWriter(w)
}),
headers: http.Header{
"Authorization": []string{
"Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")),
},
"Content-Encoding": []string{"gzip"},
},
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedBody: `{"items":[{"create":{"status":201}},{"create":{"status":201}},{"create":{"status":201}}]}`,
expectedCode: http.StatusOK,
},
"path /_bulk valid zstd payload": {
method: "POST",
path: "/_bulk",
payload: compress(strings.NewReader(`
{ "create" : { "_index" : "test", "_id" : "1" } }
{ "create" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }

`), func(w io.Writer) io.WriteCloser {
wc, _ := zstd.NewWriter(w)
return wc
}),
headers: http.Header{
"Authorization": []string{
"Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")),
},
"Content-Encoding": []string{"zstd"},
},
options: []StubESOption{StubESWithAuth("user", "pass")},
expectedBody: `{"items":[{"create":{"status":201}},{"create":{"status":201}},{"create":{"status":201}}]}`,
expectedCode: http.StatusOK,
},
}
for tname, tcase := range tests {
t.Run(tname, func(t *testing.T) {
req := httptest.NewRequest(tcase.method, tcase.path, tcase.payload)
req.Header = tcase.headers
rec := httptest.NewRecorder()
h := NewHandlerStubES(tcase.options...)
h.ServeHTTP(rec, req)
assert.Equal(t, "Elasticsearch", rec.Header().Get("X-Elastic-Product"))
assert.Equal(t, tcase.expectedCode, rec.Code)
assert.Equal(t, tcase.expectedBody, rec.Body.String())
})
}
}

func compress(from io.Reader, f func(out io.Writer) io.WriteCloser) io.Reader {
var b bytes.Buffer
enc := f(&b)
defer enc.Close()
_, _ = io.Copy(enc, from)
return &b
}