Skip to content

Commit

Permalink
Merge branch 'main' into apm-tracing-config-from-ea
Browse files Browse the repository at this point in the history
  • Loading branch information
kyungeunni committed Jul 2, 2024
2 parents 4880854 + 8655204 commit 9d77264
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 21 deletions.
8 changes: 4 additions & 4 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected]

--------------------------------------------------------------------------------
Dependency : github.com/elastic/beats/v7
Version: v7.0.0-alpha2.0.20240628125655-5323dc625833
Version: v7.0.0-alpha2.0.20240701143943-cb0900496952
Licence type (autodetected): Elastic
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/[email protected].20240628125655-5323dc625833/LICENSE.txt:
Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/[email protected].20240701143943-cb0900496952/LICENSE.txt:

Source code in this repository is variously licensed under the Apache License
Version 2.0, an Apache compatible license, or the Elastic License. Outside of
Expand Down Expand Up @@ -2342,11 +2342,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------------------
Dependency : github.com/gofrs/flock
Version: v0.9.0
Version: v0.11.0
Licence type (autodetected): BSD-3-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/gofrs/flock@v0.9.0/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/gofrs/flock@v0.11.0/LICENSE:

Copyright (c) 2018-2024, The Gofrs
Copyright (c) 2015-2020, Tim Heckman
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ x-logging: &default-logging
max-size: "1g"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.0-af44b0a5-SNAPSHOT
image: docker.elastic.co/elasticsearch/elasticsearch:8.15.0-5d6d1bc4-SNAPSHOT
ports:
- 9200:9200
healthcheck:
Expand Down Expand Up @@ -41,7 +41,7 @@ services:
logging: *default-logging

kibana:
image: docker.elastic.co/kibana/kibana:8.15.0-af44b0a5-SNAPSHOT
image: docker.elastic.co/kibana/kibana:8.15.0-5d6d1bc4-SNAPSHOT
ports:
- 5601:5601
healthcheck:
Expand All @@ -60,7 +60,7 @@ services:
logging: *default-logging

metricbeat:
image: docker.elastic.co/beats/metricbeat:8.15.0-af44b0a5-SNAPSHOT
image: docker.elastic.co/beats/metricbeat:8.15.0-5d6d1bc4-SNAPSHOT
environment:
ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'
ELASTICSEARCH_USERNAME: "${KIBANA_ES_USER:-admin}"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/elastic/go-sysinfo v1.14.0
github.com/elastic/go-ucfg v0.8.8
github.com/go-sourcemap/sourcemap v2.1.4+incompatible
github.com/gofrs/flock v0.9.0
github.com/gofrs/flock v0.11.0
github.com/gofrs/uuid v4.4.0+incompatible
github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-sourcemap/sourcemap v2.1.4+incompatible h1:a+iTbH5auLKxaNwQFg0B+TCYl6lbukKPc7b5x0n1s6Q=
github.com/go-sourcemap/sourcemap v2.1.4+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg=
github.com/gofrs/flock v0.9.0 h1:QqEH0zKHPdEyY4YbJLleD9Il4ft7h6hn3gECO6Ss4rQ=
github.com/gofrs/flock v0.9.0/go.mod h1:O+L78Axre/Bc0Ya3RlNiGP+Rt0tFHWjtHTQ+B2uPZw8=
github.com/gofrs/flock v0.11.0 h1:AGFQxrpWd8ezw60AvLWIPbxMydNfF8564pwH3FCty0g=
github.com/gofrs/flock v0.11.0/go.mod h1:FirDy1Ing0mI2+kB6wk+vyyAH+e6xiE+EYA0jnzV9jc=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.4.1 h1:1Yx4Myt7BxzvUr5ldGSbwYiZG6t9wGBZ+8/fX3Wvtq0=
Expand Down
2 changes: 1 addition & 1 deletion testing/infra/k8s/base/stack/apm-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: ApmServer
metadata:
name: apm-server
spec:
version: 8.15.0-af44b0a5-SNAPSHOT
version: 8.15.0-5d6d1bc4-SNAPSHOT
count: 1
http:
tls:
Expand Down
2 changes: 1 addition & 1 deletion testing/infra/k8s/base/stack/elasticsearch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Elasticsearch
metadata:
name: elasticsearch
spec:
version: 8.15.0-af44b0a5-SNAPSHOT
version: 8.15.0-5d6d1bc4-SNAPSHOT
auth:
fileRealm:
- secretName: elasticsearch-admin
Expand Down
2 changes: 1 addition & 1 deletion testing/infra/k8s/base/stack/kibana.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kind: Kibana
metadata:
name: kibana
spec:
version: 8.15.0-af44b0a5-SNAPSHOT
version: 8.15.0-5d6d1bc4-SNAPSHOT
count: 1
elasticsearchRef:
name: elasticsearch
Expand Down
24 changes: 18 additions & 6 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package eventstorage

import (
"bytes"
"errors"
"fmt"
"sync/atomic"
Expand Down Expand Up @@ -162,12 +163,17 @@ func (rw *ReadWriter) IsTraceSampled(traceID string) (bool, error) {
// WriteTraceEvent may return before the write is committed to storage.
// Call Flush to ensure the write is committed.
func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb.APMEvent, opts WriterOpts) error {
key := append(append([]byte(traceID), ':'), id...)
data, err := rw.s.codec.EncodeEvent(event)
if err != nil {
return err
}
return rw.writeEntry(badger.NewEntry(key[:], data).WithMeta(entryMetaTraceEvent), opts)
var buf bytes.Buffer
buf.Grow(len(traceID) + 1 + len(id))
buf.WriteString(traceID)
buf.WriteByte(':')
buf.WriteString(id)
key := buf.Bytes()
return rw.writeEntry(badger.NewEntry(key, data).WithMeta(entryMetaTraceEvent), opts)
}

func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
Expand Down Expand Up @@ -242,7 +248,13 @@ func estimateSize(e *badger.Entry) int64 {

// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
var buf bytes.Buffer
buf.Grow(len(traceID) + 1 + len(id))
buf.WriteString(traceID)
buf.WriteByte(':')
buf.WriteString(id)
key := buf.Bytes()

err := rw.txn.Delete(key)
// If the transaction is already too big to accommodate the new entry, flush
// the existing transaction and set the entry on a new one, otherwise,
Expand Down Expand Up @@ -288,16 +300,16 @@ func (rw *ReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error
}
switch item.UserMeta() {
case entryMetaTraceEvent:
var event modelpb.APMEvent
event := modelpb.APMEventFromVTPool()
if err := item.Value(func(data []byte) error {
if err := rw.s.codec.DecodeEvent(data, &event); err != nil {
if err := rw.s.codec.DecodeEvent(data, event); err != nil {
return fmt.Errorf("codec failed to decode event: %w", err)
}
return nil
}); err != nil {
return err
}
*out = append(*out, &event)
*out = append(*out, event)
default:
// Unknown entry meta: ignore.
continue
Expand Down
9 changes: 8 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (p *Processor) Run() error {
}
})
g.Go(func() error {
var events modelpb.Batch
// TODO(axw) pace the publishing over the flush interval?
// Alternatively we can rely on backpressure from the reporter,
// removing the artificial one second timeout from publisher code
Expand Down Expand Up @@ -510,7 +511,8 @@ func (p *Processor) Run() error {
"received error writing sampled trace: %s", err,
)
}
var events modelpb.Batch

events = events[:0]
if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil {
p.rateLimitedLogger.Warnf(
"received error reading trace events: %s", err,
Expand Down Expand Up @@ -543,6 +545,11 @@ func (p *Processor) Run() error {
if err := p.config.BatchProcessor.ProcessBatch(gracefulContext, &events); err != nil {
p.logger.With(logp.Error(err)).Warn("failed to report events")
}

for i := range events {
events[i].ReturnToVTPool()
events[i] = nil // not required but ensure that there is no ref to the freed event
}
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestProcessRemoteTailSampling(t *testing.T) {
select {
case <-ctx.Done():
return ctx.Err()
case reported <- *batch:
case reported <- batch.Clone():
return nil
}
})
Expand Down

0 comments on commit 9d77264

Please sign in to comment.