From 606258df95f7ada98ced8886f5eced1d7ab2f224 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Tue, 13 Aug 2024 08:52:56 +0800 Subject: [PATCH] Remove apm-data decoder pooling Signed-off-by: Marc Lopez Rubio --- .../internal/modeldecoder/rumv3/decoder.go | 55 +-------- .../internal/modeldecoder/rumv3/error_test.go | 9 -- .../modeldecoder/rumv3/metadata_test.go | 9 -- .../modeldecoder/rumv3/transaction_test.go | 9 -- .../internal/modeldecoder/v2/decoder.go | 106 +----------------- .../internal/modeldecoder/v2/error_test.go | 9 -- .../internal/modeldecoder/v2/log_test.go | 9 -- .../internal/modeldecoder/v2/metadata_test.go | 9 -- .../modeldecoder/v2/metricset_test.go | 9 -- .../internal/modeldecoder/v2/span_test.go | 9 -- .../modeldecoder/v2/transaction_test.go | 9 -- 11 files changed, 9 insertions(+), 233 deletions(-) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/decoder.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/decoder.go index 36fde6fe2dc..934e29e907a 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/decoder.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/decoder.go @@ -23,7 +23,6 @@ import ( "net/http" "net/textproto" "strings" - "sync" "time" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" @@ -33,55 +32,9 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -var ( - errorRootPool = sync.Pool{ - New: func() interface{} { - return &errorRoot{} - }, - } - metadataRootPool = sync.Pool{ - New: func() interface{} { - return &metadataRoot{} - }, - } - transactionRootPool = sync.Pool{ - New: func() interface{} { - return &transactionRoot{} - }, - } -) - -func fetchErrorRoot() *errorRoot { - return errorRootPool.Get().(*errorRoot) -} - -func releaseErrorRoot(root *errorRoot) { - root.Reset() - errorRootPool.Put(root) -} - -func fetchMetadataRoot() *metadataRoot { - return metadataRootPool.Get().(*metadataRoot) -} - -func releaseMetadataRoot(m *metadataRoot) { - m.Reset() - metadataRootPool.Put(m) -} - -func fetchTransactionRoot() *transactionRoot { - return transactionRootPool.Get().(*transactionRoot) -} - -func releaseTransactionRoot(m *transactionRoot) { - m.Reset() - transactionRootPool.Put(m) -} - // DecodeNestedMetadata decodes metadata from d, updating out. func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error { - root := fetchMetadataRoot() - defer releaseMetadataRoot(root) + root := &metadataRoot{} if err := d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) } @@ -96,8 +49,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error { // // DecodeNestedError should be used when the stream in the decoder contains the `error` key func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchErrorRoot() - defer releaseErrorRoot(root) + root := &errorRoot{} if err := d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) } @@ -115,8 +67,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode // // DecodeNestedTransaction should be used when the decoder contains the `transaction` key func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchTransactionRoot() - defer releaseTransactionRoot(root) + root := &transactionRoot{} if err := d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) } diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/error_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/error_test.go index 6fd11c4b184..4e23ead8ec3 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/error_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/error_test.go @@ -35,15 +35,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetErrorOnRelease(t *testing.T) { - inp := `{"e":{"id":"tr-a"}}` - root := fetchErrorRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) - require.True(t, root.IsSet()) - releaseErrorRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { now := modelpb.FromTime(time.Now()) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/metadata_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/metadata_test.go index 3ce0c91336b..0b1be3bce1f 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/metadata_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/metadata_test.go @@ -115,15 +115,6 @@ func metadataExceptions(keys ...string) func(key string) bool { } } -func TestMetadataResetModelOnRelease(t *testing.T) { - inp := `{"m":{"se":{"n":"service-a"}}}` - m := fetchMetadataRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m)) - require.True(t, m.IsSet()) - releaseMetadataRoot(m) - assert.False(t, m.IsSet()) -} - func TestDecodeNestedMetadata(t *testing.T) { t.Run("decode", func(t *testing.T) { var out modelpb.APMEvent diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go index 721fa357b8f..76cfe004b01 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/rumv3/transaction_test.go @@ -35,15 +35,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetTransactionOnRelease(t *testing.T) { - inp := `{"x":{"n":"tr-a"}}` - tr := fetchTransactionRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(tr)) - require.True(t, tr.IsSet()) - releaseTransactionRoot(tr) - assert.False(t, tr.IsSet()) -} - func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { now := modelpb.FromTime(time.Now()) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/decoder.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/decoder.go index 901e20112a1..bea35b2c6bf 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/decoder.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/decoder.go @@ -26,7 +26,6 @@ import ( "regexp" "strconv" "strings" - "sync" "time" "github.com/elastic/apm-data/input/elasticapm/internal/decoder" @@ -41,39 +40,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" ) -var ( - errorRootPool = sync.Pool{ - New: func() interface{} { - return &errorRoot{} - }, - } - metadataRootPool = sync.Pool{ - New: func() interface{} { - return &metadataRoot{} - }, - } - metricsetRootPool = sync.Pool{ - New: func() interface{} { - return &metricsetRoot{} - }, - } - spanRootPool = sync.Pool{ - New: func() interface{} { - return &spanRoot{} - }, - } - transactionRootPool = sync.Pool{ - New: func() interface{} { - return &transactionRoot{} - }, - } - logRootPool = sync.Pool{ - New: func() interface{} { - return &logRoot{} - }, - } -) - var compressionStrategyText = map[string]modelpb.CompressionStrategy{ "exact_match": modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH, "same_kind": modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND, @@ -93,60 +59,6 @@ var ( reForServiceTargetExpr = regexp.MustCompile(`^([a-z0-9]+)(?:/(\w+))?$`) ) -func fetchErrorRoot() *errorRoot { - return errorRootPool.Get().(*errorRoot) -} - -func releaseErrorRoot(root *errorRoot) { - root.Reset() - errorRootPool.Put(root) -} - -func fetchMetadataRoot() *metadataRoot { - return metadataRootPool.Get().(*metadataRoot) -} - -func releaseMetadataRoot(root *metadataRoot) { - root.Reset() - metadataRootPool.Put(root) -} - -func fetchMetricsetRoot() *metricsetRoot { - return metricsetRootPool.Get().(*metricsetRoot) -} - -func releaseMetricsetRoot(root *metricsetRoot) { - root.Reset() - metricsetRootPool.Put(root) -} - -func fetchSpanRoot() *spanRoot { - return spanRootPool.Get().(*spanRoot) -} - -func releaseSpanRoot(root *spanRoot) { - root.Reset() - spanRootPool.Put(root) -} - -func fetchTransactionRoot() *transactionRoot { - return transactionRootPool.Get().(*transactionRoot) -} - -func releaseTransactionRoot(root *transactionRoot) { - root.Reset() - transactionRootPool.Put(root) -} - -func fetchLogRoot() *logRoot { - return logRootPool.Get().(*logRoot) -} - -func releaseLogRoot(root *logRoot) { - root.Reset() - logRootPool.Put(root) -} - // DecodeMetadata decodes metadata from d, updating out. // // DecodeMetadata should be used when the the stream in the decoder does not contain the @@ -166,8 +78,7 @@ func DecodeNestedMetadata(d decoder.Decoder, out *modelpb.APMEvent) error { // // DecodeNestedError should be used when the stream in the decoder contains the `error` key func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchErrorRoot() - defer releaseErrorRoot(root) + root := &errorRoot{} err := d.Decode(root) if err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) @@ -185,8 +96,7 @@ func DecodeNestedError(d decoder.Decoder, input *modeldecoder.Input, batch *mode // // DecodeNestedMetricset should be used when the stream in the decoder contains the `metricset` key func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchMetricsetRoot() - defer releaseMetricsetRoot(root) + root := &metricsetRoot{} var err error if err = d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) @@ -205,8 +115,7 @@ func DecodeNestedMetricset(d decoder.Decoder, input *modeldecoder.Input, batch * // // DecodeNestedSpan should be used when the stream in the decoder contains the `span` key func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchSpanRoot() - defer releaseSpanRoot(root) + root := &spanRoot{} var err error if err = d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) @@ -224,8 +133,7 @@ func DecodeNestedSpan(d decoder.Decoder, input *modeldecoder.Input, batch *model // // DecodeNestedTransaction should be used when the stream in the decoder contains the `transaction` key func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchTransactionRoot() - defer releaseTransactionRoot(root) + root := &transactionRoot{} var err error if err = d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) @@ -243,8 +151,7 @@ func DecodeNestedTransaction(d decoder.Decoder, input *modeldecoder.Input, batch // // DecodeNestedLog should be used when the stream in the decoder contains the `log` key func DecodeNestedLog(d decoder.Decoder, input *modeldecoder.Input, batch *modelpb.Batch) error { - root := fetchLogRoot() - defer releaseLogRoot(root) + root := &logRoot{} var err error if err = d.Decode(root); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) @@ -263,8 +170,7 @@ func DecodeNestedLog(d decoder.Decoder, input *modeldecoder.Input, batch *modelp } func decodeMetadata(decFn func(d decoder.Decoder, m *metadataRoot) error, d decoder.Decoder, out *modelpb.APMEvent) error { - m := fetchMetadataRoot() - defer releaseMetadataRoot(m) + m := &metadataRoot{} var err error if err = decFn(d, m); err != nil && err != io.EOF { return modeldecoder.NewDecoderErrFromJSONIter(err) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/error_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/error_test.go index bb07874a715..a65273b1380 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/error_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/error_test.go @@ -35,15 +35,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetErrorOnRelease(t *testing.T) { - inp := `{"error":{"id":"tr-a"}}` - root := fetchErrorRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) - require.True(t, root.IsSet()) - releaseErrorRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedError(t *testing.T) { t.Run("decode", func(t *testing.T) { now := modelpb.FromTime(time.Now()) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/log_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/log_test.go index 382f805a54f..0c312663c27 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/log_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/log_test.go @@ -30,15 +30,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetLogOnRelease(t *testing.T) { - input := `{"log":{"message":"something happened"}}` - root := fetchLogRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(input)).Decode(root)) - require.True(t, root.IsSet()) - releaseLogRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedLog(t *testing.T) { t.Run("decode", func(t *testing.T) { t.Run("withTimestamp", func(t *testing.T) { diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metadata_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metadata_test.go index 820e8d42c85..ae24fbd36b2 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metadata_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metadata_test.go @@ -204,15 +204,6 @@ func initializedInputMetadata(values *modeldecodertest.Values) (metadata, *model return input, &out } -func TestResetMetadataOnRelease(t *testing.T) { - inp := `{"metadata":{"service":{"name":"service-a"}}}` - m := fetchMetadataRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(m)) - require.True(t, m.IsSet()) - releaseMetadataRoot(m) - assert.False(t, m.IsSet()) -} - func TestDecodeMetadata(t *testing.T) { for _, tc := range []struct { name string diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metricset_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metricset_test.go index aefdd8b9ec2..bfd4717df67 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metricset_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/metricset_test.go @@ -34,15 +34,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetMetricsetOnRelease(t *testing.T) { - inp := `{"metricset":{"samples":{"a.b.":{"value":2048}}}}` - root := fetchMetricsetRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) - require.True(t, root.IsSet()) - releaseMetricsetRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedMetricset(t *testing.T) { t.Run("decode", func(t *testing.T) { now := modelpb.FromTime(time.Now()) diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/span_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/span_test.go index 463df398c12..57df65d7a69 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/span_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/span_test.go @@ -38,15 +38,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetSpanOnRelease(t *testing.T) { - inp := `{"span":{"name":"tr-a"}}` - root := fetchSpanRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) - require.True(t, root.IsSet()) - releaseSpanRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedSpan(t *testing.T) { t.Run("decode", func(t *testing.T) { defaultVal := modeldecodertest.DefaultValues() diff --git a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/transaction_test.go b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/transaction_test.go index e1e4859acc9..beae2da459d 100644 --- a/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/transaction_test.go +++ b/copy/apm-data/input/elasticapm/internal/modeldecoder/v2/transaction_test.go @@ -40,15 +40,6 @@ import ( "github.com/elastic/apm-data/model/modelpb" ) -func TestResetTransactionOnRelease(t *testing.T) { - inp := `{"transaction":{"name":"tr-a"}}` - root := fetchTransactionRoot() - require.NoError(t, decoder.NewJSONDecoder(strings.NewReader(inp)).Decode(root)) - require.True(t, root.IsSet()) - releaseTransactionRoot(root) - assert.False(t, root.IsSet()) -} - func TestDecodeNestedTransaction(t *testing.T) { t.Run("decode", func(t *testing.T) { now := modelpb.FromTime(time.Now())