From 2558f92c0a9c34bba58df7bfa601cdf61f4bc912 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 24 Jun 2024 11:53:51 +0100 Subject: [PATCH] TBS: Optimize for ReadTraceEvents miss (#13464) When a sampling decision is received from a remote apm-server, if the trace is not stored locally, prefetching values in the iterator is causing mmap vlog to be read for no value, resulting in high memory usage and disk read IO. Add an initial pass with PrefetchValues=false to optimize for a miss in ReadTraceEvents. e.g. if there are 10 apm-servers all running TBS and synced via ES, and all transactions/spans from a trace are always sent to in 1 apm-server (i.e., a single trace never stored across multiple apm-servers), this PR should cut disk read IO related to sampling decision handling to 1/10 of before. --- .../sampling/eventstorage/storage.go | 16 +++++ .../eventstorage/storage_bench_test.go | 70 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 816f22a2d1b..84caed3302b 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -263,7 +263,23 @@ func (rw *ReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') opts.Prefix = rw.readKeyBuf + // 1st pass: check whether there exist keys matching the prefix. + // Do not prefetch values so that the check is done in-memory. + // This is to optimize for cases when it is a miss. + opts.PrefetchValues = false iter := rw.txn.NewIterator(opts) + iter.Rewind() + if !iter.Valid() { + iter.Close() + return nil + } + iter.Close() + + // 2nd pass: this is only done when there exist keys matching the prefix. + // Fetch the events with PrefetchValues for performance. + // This is to optimize for cases when it is a hit. + opts.PrefetchValues = true + iter = rw.txn.NewIterator(opts) defer iter.Close() for iter.Rewind(); iter.Valid(); iter.Next() { item := iter.Item() diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index 33a42208800..3c5bafcf17a 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -161,6 +161,76 @@ func BenchmarkReadEvents(b *testing.B) { } } +func BenchmarkReadEventsHit(b *testing.B) { + // This test may take longer to run because setup time >> run time + // It may be possible that the next estimated b.N is a very large number due to short run time + // And causes next iteration setup to take a very long time. + const txnCountInTrace = 5 + + test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) { + for _, hit := range []bool{false, true} { + b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + store := eventstorage.New(db, codec) + readWriter := store.NewReadWriter() + defer readWriter.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Hour, + StorageLimitInBytes: 0, + } + + traceIDs := make([]string, b.N) + + for i := 0; i < b.N; i++ { + traceID := uuid.Must(uuid.NewV4()).String() + traceIDs[i] = traceID + for j := 0; j < txnCountInTrace; j++ { + transactionID := uuid.Must(uuid.NewV4()).String() + var transaction *modelpb.APMEvent + if bigTX { + transaction = makeTransaction(transactionID, traceID) + } else { + transaction = &modelpb.APMEvent{ + Transaction: &modelpb.Transaction{ + Id: transactionID, + }, + } + } + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil { + b.Fatal(err) + } + } + } + if err := readWriter.Flush(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + var batch modelpb.Batch + for i := 0; i < b.N; i++ { + batch = batch[:0] + + traceID := traceIDs[i] + if !hit { + // replace the last char to generate a random non-existent traceID + traceID = traceID[:len(traceID)-1] + "-" + } + + if err := readWriter.ReadTraceEvents(traceID, &batch); err != nil { + b.Fatal(err) + } + } + }) + } + } + + for _, bigTX := range []bool{true, false} { + b.Run(fmt.Sprintf("bigTX=%v", bigTX), func(b *testing.B) { + test(b, eventstorage.ProtobufCodec{}, bigTX) + }) + } +} + func BenchmarkIsTraceSampled(b *testing.B) { sampledTraceUUID := uuid.Must(uuid.NewV4()) unsampledTraceUUID := uuid.Must(uuid.NewV4())