Skip to content

Commit

Permalink
processor: Avoid leaking labels between batches (#8081)
Browse files Browse the repository at this point in the history
Fixes a bug introduced in #6682 by performing a copy of the `baseEvent`
before each of the events is decoded. The event was being shallow copied
and it was reusing the same maps between batches.

The linked PR changed the behavior (although not in a very obviously)
and was instead merging the labels onto the passed `APMEvent`. This
patch reintroduces the previous behavior in a more obvious manner.

Signed-off-by: Marc Lopez Rubio <[email protected]>
Co-authored-by: stuart nelson <[email protected]>
  • Loading branch information
marclop and stuartnelson3 authored May 11, 2022
1 parent f23d051 commit b782e9e
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,10 @@
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true",
"wrapped_reporter": "true"
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8"
},
"message": "Request method 'POST' not supported",
"numeric_labels": {
"code": 200,
"segment": 5
},
"observer": {
Expand Down Expand Up @@ -355,13 +352,9 @@
},
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true",
"wrapped_reporter": "true"
"group": "experimental"
},
"numeric_labels": {
"code": 200,
"segment": 5
},
"observer": {
Expand Down Expand Up @@ -558,11 +551,9 @@
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true",
"wrapped_reporter": "true"
},
"numeric_labels": {
"code": 200,
"segment": 5
},
"observer": {
Expand Down Expand Up @@ -696,9 +687,7 @@
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true",
"wrapped_reporter": "true"
"success": "true"
},
"metricset.name": "span_breakdown",
"numeric_labels": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,10 @@
]
},
"labels": {
"some": "abc",
"success": "true",
"tag1": "one"
},
"metricset.name": "app",
"numeric_labels": {
"code": 200,
"tag2": 2
},
"observer": {
Expand Down Expand Up @@ -141,13 +138,10 @@
]
},
"labels": {
"some": "abc",
"success": "true",
"tag1": "one"
},
"metricset.name": "app",
"numeric_labels": {
"code": 200,
"tag2": 2
},
"observer": {
Expand Down Expand Up @@ -198,13 +192,10 @@
]
},
"labels": {
"some": "abc",
"success": "true",
"tag1": "one"
},
"metricset.name": "app",
"numeric_labels": {
"code": 200,
"tag2": 2
},
"observer": {
Expand Down Expand Up @@ -267,8 +258,6 @@
]
},
"labels": {
"some": "abc",
"success": "true",
"tag1": "one"
},
"latency_distribution": {
Expand All @@ -285,7 +274,6 @@
},
"metricset.name": "app",
"numeric_labels": {
"code": 200,
"tag2": 2
},
"observer": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -201,8 +200,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -466,8 +464,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -613,8 +610,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -819,8 +815,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -960,8 +955,7 @@
}
},
"labels": {
"tag1": "value1",
"tag4": "true"
"tag1": "label1"
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@
}
},
"labels": {
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"tag1": "one",
"tag4": "false",
"wrapped_reporter": "true"
},
"numeric_labels": {
"tag2": 12,
"tag3": 12.45
"tag2": 2
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -426,14 +423,11 @@
}
},
"labels": {
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"tag1": "one",
"tag4": "false",
"wrapped_reporter": "true"
},
"numeric_labels": {
"tag2": 12,
"tag3": 12.45
"tag2": 2
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -579,14 +573,11 @@
}
},
"labels": {
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"tag1": "one",
"tag4": "false",
"wrapped_reporter": "true"
},
"numeric_labels": {
"tag2": 12,
"tag3": 12.45
"tag2": 2
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down Expand Up @@ -752,14 +743,11 @@
}
},
"labels": {
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"tag1": "one",
"tag4": "false",
"wrapped_reporter": "true"
},
"numeric_labels": {
"tag2": 12,
"tag3": 12.45
"tag2": 2
},
"observer": {
"ephemeral_id": "00000000-0000-0000-0000-000000000000",
Expand Down
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ https://github.com/elastic/apm-server/compare/8.2\...main[View commits]

[float]
==== Bug fixes
- Fix a bug that caused some of the decoded events to have incorrect labels {pull}8081[8081]

[float]
==== Intake API Changes
Expand Down
17 changes: 16 additions & 1 deletion processor/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ func (p *Processor) readBatch(
// required for backwards compatibility - sending empty lines was permitted in previous versions
continue
}
input := modeldecoder.Input{Base: baseEvent}
// We copy the event for each iteration of the batch, as to avoid
// shallow copies of Labels and NumericLabels.
input := modeldecoder.Input{Base: copyEvent(baseEvent)}
switch eventType := p.identifyEventType(body); string(eventType) {
case errorEventType:
err = v2.DecodeNestedError(reader, &input, batch)
Expand Down Expand Up @@ -307,3 +309,16 @@ func (sr *streamReader) wrapError(err error) error {
}
return err
}

// copyEvent returns a shallow copy of the APMEvent with a deep copy of the
// labels and numeric labels.
func copyEvent(e model.APMEvent) model.APMEvent {
var out = e
if out.Labels != nil {
out.Labels = out.Labels.Clone()
}
if out.NumericLabels != nil {
out.NumericLabels = out.NumericLabels.Clone()
}
return out
}
39 changes: 39 additions & 0 deletions processor/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"net"
"path/filepath"
"strings"
"testing"
"testing/iotest"
"time"
Expand Down Expand Up @@ -268,6 +269,44 @@ func TestRUMV3(t *testing.T) {
}
}

func TestLabelLeak(t *testing.T) {
payload := `{"metadata": {"service": {"name": "testsvc", "environment": "staging", "version": null, "agent": {"name": "python", "version": "6.9.1"}, "language": {"name": "python", "version": "3.10.4"}, "runtime": {"name": "CPython", "version": "3.10.4"}, "framework": {"name": "flask", "version": "2.1.1"}}, "process": {"pid": 2112739, "ppid": 2112738, "argv": ["/home/stuart/workspace/sdh/581/venv/lib/python3.10/site-packages/flask/__main__.py", "run"], "title": null}, "system": {"hostname": "slaptop", "architecture": "x86_64", "platform": "linux"}, "labels": {"ci_commit": "unknown", "numeric": 1}}}
{"transaction": {"id": "88dee29a6571b948", "trace_id": "ba7f5d18ac4c7f39d1ff070c79b2bea5", "name": "GET /withlabels", "type": "request", "duration": 1.6199999999999999, "result": "HTTP 2xx", "timestamp": 1652185276804681, "outcome": "success", "sampled": true, "span_count": {"started": 0, "dropped": 0}, "sample_rate": 1.0, "context": {"request": {"env": {"REMOTE_ADDR": "127.0.0.1", "SERVER_NAME": "127.0.0.1", "SERVER_PORT": "5000"}, "method": "GET", "socket": {"remote_address": "127.0.0.1"}, "cookies": {}, "headers": {"host": "localhost:5000", "user-agent": "curl/7.81.0", "accept": "*/*", "app-os": "Android", "content-type": "application/json; charset=utf-8", "content-length": "29"}, "url": {"full": "http://localhost:5000/withlabels?second_with_labels", "protocol": "http:", "hostname": "localhost", "pathname": "/withlabels", "port": "5000", "search": "?second_with_labels"}}, "response": {"status_code": 200, "headers": {"Content-Type": "application/json", "Content-Length": "14"}}, "tags": {"appOs": "Android", "email_set": "[email protected]", "time_set": 1652185276}}}}
{"transaction": {"id": "ba5c6d6c1ab44bd1", "trace_id": "88c0a00431531a80c5ca9a41fe115f41", "name": "GET /nolabels", "type": "request", "duration": 0.652, "result": "HTTP 2xx", "timestamp": 1652185278813952, "outcome": "success", "sampled": true, "span_count": {"started": 0, "dropped": 0}, "sample_rate": 1.0, "context": {"request": {"env": {"REMOTE_ADDR": "127.0.0.1", "SERVER_NAME": "127.0.0.1", "SERVER_PORT": "5000"}, "method": "GET", "socket": {"remote_address": "127.0.0.1"}, "cookies": {}, "headers": {"host": "localhost:5000", "user-agent": "curl/7.81.0", "accept": "*/*"}, "url": {"full": "http://localhost:5000/nolabels?third_no_label", "protocol": "http:", "hostname": "localhost", "pathname": "/nolabels", "port": "5000", "search": "?third_no_label"}}, "response": {"status_code": 200, "headers": {"Content-Type": "text/html; charset=utf-8", "Content-Length": "14"}}, "tags": {}}}}`

baseEvent := model.APMEvent{
Host: model.Host{IP: []net.IP{net.ParseIP("192.0.0.1")}},
}

var processed *model.Batch
batchProcessor := model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error {
processed = b
return nil
})

p := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}, make(chan struct{}, 1))
var actualResult Result
err := p.HandleStream(context.Background(), baseEvent, strings.NewReader(payload), 10, batchProcessor, &actualResult)
require.NoError(t, err)

txs := *processed
assert.Len(t, txs, 2)
// Assert first tx
assert.Equal(t, model.NumericLabels{
"time_set": {Value: 1652185276},
"numeric": {Value: 1},
}, txs[0].NumericLabels)
assert.Equal(t, model.Labels{
"appOs": {Value: "Android"},
"email_set": {Value: "[email protected]"},
"ci_commit": {Value: "unknown"},
}, txs[0].Labels)

// Assert second tx
assert.Equal(t, model.NumericLabels{"numeric": {Value: 1}}, txs[1].NumericLabels)
assert.Equal(t, model.Labels{"ci_commit": {Value: "unknown"}}, txs[1].Labels)
}

func makeApproveEventsBatchProcessor(t *testing.T, name string, count *int) model.BatchProcessor {
return model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error {
events := b.Transform(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true"
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8"
},
"numeric_labels": {
"code": 200,
"segment": 5
},
"parent": {
Expand Down Expand Up @@ -334,12 +332,9 @@
},
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true"
"group": "experimental"
},
"numeric_labels": {
"code": 200,
"segment": 5
},
"parent": {
Expand Down Expand Up @@ -523,11 +518,9 @@
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true"
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8"
},
"numeric_labels": {
"code": 200,
"segment": 5
},
"parent": {
Expand Down Expand Up @@ -649,7 +642,6 @@
"labels": {
"ab_testing": "true",
"group": "experimental",
"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8",
"success": "true"
},
"numeric_labels": {
Expand Down
Loading

0 comments on commit b782e9e

Please sign in to comment.