Skip to content

Commit

Permalink
Remove apm-data decoder pooling
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop committed Aug 13, 2024
1 parent ab5a9d8 commit 606258d
Show file tree
Hide file tree
Showing 11 changed files with 9 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/http"
"net/textproto"
"strings"
"sync"
"time"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
Expand All @@ -33,55 +32,9 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

var (
errorRootPool = sync.Pool{
New: func() interface{} {
return &errorRoot{}
},
}
metadataRootPool = sync.Pool{
New: func() interface{} {
return &metadataRoot{}
},
}
transactionRootPool = sync.Pool{
New: func() interface{} {
return &transactionRoot{}
},
}
)

func fetchErrorRoot() *errorRoot {
return errorRootPool.Get().(*errorRoot)
}

func releaseErrorRoot(root *errorRoot) {
root.Reset()
errorRootPool.Put(root)
}

func fetchMetadataRoot() *metadataRoot {
return metadataRootPool.Get().(*metadataRoot)
}

func releaseMetadataRoot(m *metadataRoot) {
m.Reset()
metadataRootPool.Put(m)
}

func fetchTransactionRoot() *transactionRoot {
return transactionRootPool.Get().(*transactionRoot)
}

func releaseTransactionRoot(m *transactionRoot) {
m.Reset()
transactionRootPool.Put(m)
}

// DecodeNestedMetadata decodes metadata from d, updating out.
func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error {
root := fetchMetadataRoot()
defer releaseMetadataRoot(root)
root := &metadataRoot{}
if err := d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
}
Expand All @@ -96,8 +49,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error {
//
// DecodeNestedError should be used when the stream in the decoder contains the `error` key
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchErrorRoot()
defer releaseErrorRoot(root)
root := &errorRoot{}
if err := d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
}
Expand All @@ -115,8 +67,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode
//
// DecodeNestedTransaction should be used when the decoder contains the `transaction` key
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchTransactionRoot()
defer releaseTransactionRoot(root)
root := &transactionRoot{}
if err := d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetErrorOnRelease(t *testing.T) {
inp := `{"e":{"id":"tr-a"}}`
root := fetchErrorRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseErrorRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := modelpb.FromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,6 @@ func metadataExceptions(keys ...string) func(key string) bool {
}
}

func TestMetadataResetModelOnRelease(t *testing.T) {
inp := `{"m":{"se":{"n":"service-a"}}}`
m := fetchMetadataRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m))
require.True(t, m.IsSet())
releaseMetadataRoot(m)
assert.False(t, m.IsSet())
}

func TestDecodeNestedMetadata(t *testing.T) {
t.Run("decode", func(t *testing.T) {
var out modelpb.APMEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetTransactionOnRelease(t *testing.T) {
inp := `{"x":{"n":"tr-a"}}`
tr := fetchTransactionRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(tr))
require.True(t, tr.IsSet())
releaseTransactionRoot(tr)
assert.False(t, tr.IsSet())
}

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := modelpb.FromTime(time.Now())
Expand Down
106 changes: 6 additions & 100 deletions copy/apm-data/input/elasticapm/internal/modeldecoder/v2/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/elastic/apm-data/input/elasticapm/internal/decoder"
Expand All @@ -41,39 +40,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
errorRootPool = sync.Pool{
New: func() interface{} {
return &errorRoot{}
},
}
metadataRootPool = sync.Pool{
New: func() interface{} {
return &metadataRoot{}
},
}
metricsetRootPool = sync.Pool{
New: func() interface{} {
return &metricsetRoot{}
},
}
spanRootPool = sync.Pool{
New: func() interface{} {
return &spanRoot{}
},
}
transactionRootPool = sync.Pool{
New: func() interface{} {
return &transactionRoot{}
},
}
logRootPool = sync.Pool{
New: func() interface{} {
return &logRoot{}
},
}
)

var compressionStrategyText = map[string]modelpb.CompressionStrategy{
"exact_match": modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH,
"same_kind": modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND,
Expand All @@ -93,60 +59,6 @@ var (
reForServiceTargetExpr = regexp.MustCompile(`^([a-z0-9]+)(?:/(\w+))?$`)
)

func fetchErrorRoot() *errorRoot {
return errorRootPool.Get().(*errorRoot)
}

func releaseErrorRoot(root *errorRoot) {
root.Reset()
errorRootPool.Put(root)
}

func fetchMetadataRoot() *metadataRoot {
return metadataRootPool.Get().(*metadataRoot)
}

func releaseMetadataRoot(root *metadataRoot) {
root.Reset()
metadataRootPool.Put(root)
}

func fetchMetricsetRoot() *metricsetRoot {
return metricsetRootPool.Get().(*metricsetRoot)
}

func releaseMetricsetRoot(root *metricsetRoot) {
root.Reset()
metricsetRootPool.Put(root)
}

func fetchSpanRoot() *spanRoot {
return spanRootPool.Get().(*spanRoot)
}

func releaseSpanRoot(root *spanRoot) {
root.Reset()
spanRootPool.Put(root)
}

func fetchTransactionRoot() *transactionRoot {
return transactionRootPool.Get().(*transactionRoot)
}

func releaseTransactionRoot(root *transactionRoot) {
root.Reset()
transactionRootPool.Put(root)
}

func fetchLogRoot() *logRoot {
return logRootPool.Get().(*logRoot)
}

func releaseLogRoot(root *logRoot) {
root.Reset()
logRootPool.Put(root)
}

// DecodeMetadata decodes metadata from d, updating out.
//
// DecodeMetadata should be used when the the stream in the decoder does not contain the
Expand All @@ -166,8 +78,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error {
//
// DecodeNestedError should be used when the stream in the decoder contains the `error` key
func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchErrorRoot()
defer releaseErrorRoot(root)
root := &errorRoot{}
err := d.Decode(root)
if err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand All @@ -185,8 +96,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode
//
// DecodeNestedMetricset should be used when the stream in the decoder contains the `metricset` key
func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchMetricsetRoot()
defer releaseMetricsetRoot(root)
root := &metricsetRoot{}
var err error
if err = d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand All @@ -205,8 +115,7 @@ func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch *
//
// DecodeNestedSpan should be used when the stream in the decoder contains the `span` key
func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchSpanRoot()
defer releaseSpanRoot(root)
root := &spanRoot{}
var err error
if err = d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand All @@ -224,8 +133,7 @@ func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, batch *model
//
// DecodeNestedTransaction should be used when the stream in the decoder contains the `transaction` key
func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchTransactionRoot()
defer releaseTransactionRoot(root)
root := &transactionRoot{}
var err error
if err = d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand All @@ -243,8 +151,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch
//
// DecodeNestedLog should be used when the stream in the decoder contains the `log` key
func DecodeNestedLog(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error {
root := fetchLogRoot()
defer releaseLogRoot(root)
root := &logRoot{}
var err error
if err = d.Decode(root); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand All @@ -263,8 +170,7 @@ func DecodeNestedLog(d decoder.Decoder, input *modeldecoder.Input, batch *modelp
}

func decodeMetadata(decFn func(d decoder.Decoder, m *metadataRoot) error, d decoder.Decoder, out *modelpb.APMEvent) error {
m := fetchMetadataRoot()
defer releaseMetadataRoot(m)
m := &metadataRoot{}
var err error
if err = decFn(d, m); err != nil && err != io.EOF {
return modeldecoder.NewDecoderErrFromJSONIter(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetErrorOnRelease(t *testing.T) {
inp := `{"error":{"id":"tr-a"}}`
root := fetchErrorRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseErrorRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedError(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := modelpb.FromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetLogOnRelease(t *testing.T) {
input := `{"log":{"message":"something happened"}}`
root := fetchLogRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(input)).Decode(root))
require.True(t, root.IsSet())
releaseLogRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedLog(t *testing.T) {
t.Run("decode", func(t *testing.T) {
t.Run("withTimestamp", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,6 @@ func initializedInputMetadata(values *modeldecodertest.Values) (metadata, *model
return input, &out
}

func TestResetMetadataOnRelease(t *testing.T) {
inp := `{"metadata":{"service":{"name":"service-a"}}}`
m := fetchMetadataRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m))
require.True(t, m.IsSet())
releaseMetadataRoot(m)
assert.False(t, m.IsSet())
}

func TestDecodeMetadata(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetMetricsetOnRelease(t *testing.T) {
inp := `{"metricset":{"samples":{"a.b.":{"value":2048}}}}`
root := fetchMetricsetRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseMetricsetRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedMetricset(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := modelpb.FromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetSpanOnRelease(t *testing.T) {
inp := `{"span":{"name":"tr-a"}}`
root := fetchSpanRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseSpanRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedSpan(t *testing.T) {
t.Run("decode", func(t *testing.T) {
defaultVal := modeldecodertest.DefaultValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ import (
"github.com/elastic/apm-data/model/modelpb"
)

func TestResetTransactionOnRelease(t *testing.T) {
inp := `{"transaction":{"name":"tr-a"}}`
root := fetchTransactionRoot()
require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root))
require.True(t, root.IsSet())
releaseTransactionRoot(root)
assert.False(t, root.IsSet())
}

func TestDecodeNestedTransaction(t *testing.T) {
t.Run("decode", func(t *testing.T) {
now := modelpb.FromTime(time.Now())
Expand Down

0 comments on commit 606258d

Please sign in to comment.