Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compute tbs entry size before writing and flush to avoid limit error #12120

Merged
merged 26 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1d22c37
feat: compute tbs entry size before writing and flush to avoid limit …
kruskall Nov 29, 2023
bb819bc
Merge branch 'main' into feat/discarded-txn
kruskall Dec 5, 2023
a16a4ce
fix: move pendingSize to storage and make it atomic
kruskall Dec 5, 2023
613e58f
refactor: remove default pending size
kruskall Dec 5, 2023
958ffc5
fix: track readwriters pending size and resolve race conditions
kruskall Dec 5, 2023
c19c88e
lint: remove unused method
kruskall Dec 5, 2023
9c9cca2
fix: add base transaction size and add more comments
kruskall Dec 5, 2023
40c17a2
test: fix storage limit test
kruskall Dec 5, 2023
25f0a4c
lint: remove unused ErrLimitReached
kruskall Dec 5, 2023
52767f8
fix: do not add entrySize twice
kruskall Dec 6, 2023
de0230b
Merge branch 'main' into feat/discarded-txn
kruskall Jan 4, 2024
ac8f074
docs: add pendingSize comment
kruskall Jan 4, 2024
d6fece5
lint: fix linter issues
kruskall Jan 4, 2024
af8a170
fix: respect storage limit
kruskall Jan 5, 2024
5842784
fix: handle 0 storage limit (unlimited)
kruskall Jan 5, 2024
57c734b
Merge branch 'main' into feat/discarded-txn
kruskall Jan 5, 2024
bc0188e
fix: flush what we have before discarding the transaction
kruskall Jan 5, 2024
94aaa7e
fix: do not discard txn twice
kruskall Jan 11, 2024
c0cd5ff
test: fix error message typo
kruskall Jan 11, 2024
0848890
fix: update pendingSize after flush and refactor for clarity
kruskall Jan 11, 2024
4a351af
Merge branch 'main' into feat/discarded-txn
kruskall Jan 11, 2024
5519367
Merge branch 'main' into feat/discarded-txn
kruskall Jan 17, 2024
a71b50d
docs: update test comment
kruskall Jan 17, 2024
7113324
Merge branch 'main' into feat/discarded-txn
kruskall Jan 17, 2024
22b613f
docs: readd db.size comment
kruskall Jan 17, 2024
6bb59ca
Merge branch 'main' into feat/discarded-txn
kruskall Jan 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions x-pack/apm-server/sampling/eventstorage/sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func (s *ShardedReadWriter) Close() {
}

// Flush flushes all sharded storage readWriters.
func (s *ShardedReadWriter) Flush(limit int64) error {
func (s *ShardedReadWriter) Flush() error {
var result error
for i := range s.readWriters {
if err := s.readWriters[i].Flush(limit); err != nil {
if err := s.readWriters[i].Flush(); err != nil {
result = multierror.Append(result, err)
}
}
Expand Down Expand Up @@ -99,10 +99,10 @@ func (rw *lockedReadWriter) Close() {
rw.rw.Close()
}

func (rw *lockedReadWriter) Flush(limit int64) error {
func (rw *lockedReadWriter) Flush() error {
rw.mu.Lock()
defer rw.mu.Unlock()
return rw.rw.Flush(limit)
return rw.rw.Flush()
}

func (rw *lockedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
Expand Down
115 changes: 73 additions & 42 deletions x-pack/apm-server/sampling/eventstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package eventstorage
import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/dgraph-io/badger/v2"
Expand All @@ -20,6 +21,10 @@ const (
entryMetaTraceSampled = 's'
entryMetaTraceUnsampled = 'u'
entryMetaTraceEvent = 'e'

// Initial transaction size
// len(txnKey) + 10
baseTransactionSize = 10 + 11
)

var (
Expand All @@ -35,8 +40,10 @@ var (
// Storage provides storage for sampled transactions and spans,
// and for recording trace sampling decisions.
type Storage struct {
db *badger.DB
codec Codec
db *badger.DB
// pendingSize tracks the total size of pending writes across ReadWriters
pendingSize *atomic.Int64
kruskall marked this conversation as resolved.
Show resolved Hide resolved
codec Codec
}

// Codec provides methods for encoding and decoding events.
Expand All @@ -47,7 +54,7 @@ type Codec interface {

// New returns a new Storage using db and codec.
func New(db *badger.DB, codec Codec) *Storage {
return &Storage{db: db, codec: codec}
return &Storage{db: db, pendingSize: &atomic.Int64{}, codec: codec}
}

// NewShardedReadWriter returns a new ShardedReadWriter, for sharded
Expand All @@ -64,23 +71,12 @@ func (s *Storage) NewShardedReadWriter() *ShardedReadWriter {
//
// The returned ReadWriter must be closed when it is no longer needed.
func (s *Storage) NewReadWriter() *ReadWriter {
s.pendingSize.Add(baseTransactionSize)
return &ReadWriter{
s: s,
txn: s.db.NewTransaction(true),
}
}

func (s *Storage) limitReached(limit int64) (int64, bool) {
if limit == 0 {
return 0, false
s: s,
txn: s.db.NewTransaction(true),
pendingSize: baseTransactionSize,
}
// The badger database has an async size reconciliation, with a 1 minute
// ticker that keeps the lsm and vlog sizes updated in an in-memory map.
// It's OK to call call s.db.Size() on the hot path, since the memory
// lookup is cheap.
lsm, vlog := s.db.Size()
current := lsm + vlog
return current, current >= limit
}

// WriterOpts provides configuration options for writes to storage
Expand All @@ -105,6 +101,8 @@ type ReadWriter struct {
// be unmodified until the end of a transaction.
readKeyBuf []byte
pendingWrites int
// pendingSize tracks the size of pending writes in the current ReadWriter
pendingSize int64
}

// Close closes the writer. Any writes that have not been flushed may be lost.
Expand All @@ -120,25 +118,14 @@ func (rw *ReadWriter) Close() {
// Flush must be called to ensure writes are committed to storage.
// If Flush is not called before the writer is closed, then writes
// may be lost.
//
// Flush returns ErrLimitReached, or an error that wraps it, when
// the StorageLimiter reports that the combined size of LSM and Vlog
// files exceeds the configured threshold.
func (rw *ReadWriter) Flush(limit int64) error {
func (rw *ReadWriter) Flush() error {
const flushErrFmt = "failed to flush pending writes: %w"
if current, limitReached := rw.s.limitReached(limit); limitReached {
// Discard the txn and re-create it if the soft limit has been reached.
rw.txn.Discard()
rw.txn = rw.s.db.NewTransaction(true)
rw.pendingWrites = 0
return fmt.Errorf(
flushErrFmt+" (current: %d, limit: %d)",
ErrLimitReached, current, limit,
)
}
err := rw.txn.Commit()
rw.txn = rw.s.db.NewTransaction(true)
rw.s.pendingSize.Add(-rw.pendingSize)
rw.pendingWrites = 0
rw.pendingSize = baseTransactionSize
rw.s.pendingSize.Add(baseTransactionSize)
if err != nil {
return fmt.Errorf(flushErrFmt, err)
}
Expand Down Expand Up @@ -185,30 +172,74 @@ func (rw *ReadWriter) WriteTraceEvent(traceID string, id string, event *modelpb.

func (rw *ReadWriter) writeEntry(e *badger.Entry, opts WriterOpts) error {
rw.pendingWrites++
err := rw.txn.SetEntry(e.WithTTL(opts.TTL))
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
entrySize := estimateSize(e)
// The badger database has an async size reconciliation, with a 1 minute
// ticker that keeps the lsm and vlog sizes updated in an in-memory map.
// It's OK to call call s.db.Size() on the hot path, since the memory
// lookup is cheap.
lsm, vlog := rw.s.db.Size()
kruskall marked this conversation as resolved.
Show resolved Hide resolved

// there are multiple ReadWriters writing to the same storage so add
// the entry size and consider the new value to avoid TOCTOU issues.
pendingSize := rw.s.pendingSize.Add(entrySize)
carsonip marked this conversation as resolved.
Show resolved Hide resolved
rw.pendingSize += entrySize

if current := pendingSize + lsm + vlog; opts.StorageLimitInBytes != 0 && current >= opts.StorageLimitInBytes {
// flush what we currently have and discard the current entry
if err := rw.Flush(); err != nil {
kruskall marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return fmt.Errorf("%w (current: %d, limit: %d)", ErrLimitReached, current, opts.StorageLimitInBytes)
}

if rw.pendingWrites >= 200 {
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
// Attempt to flush if there are 200 or more uncommitted writes.
// This ensures calls to ReadTraceEvents are not slowed down;
// ReadTraceEvents uses an iterator, which must sort all keys
// of uncommitted writes.
// The 200 value yielded a good balance between read and write speed:
// https://github.com/elastic/apm-server/pull/8407#issuecomment-1162994643
if err := rw.Flush(); err != nil {
return err
}

// the current ReadWriter flushed the transaction and reset the pendingSize so add
// the entrySize again.
rw.pendingSize += entrySize
rw.s.pendingSize.Add(entrySize)
carsonip marked this conversation as resolved.
Show resolved Hide resolved
}

err := rw.txn.SetEntry(e.WithTTL(opts.TTL))

// If the transaction is already too big to accommodate the new entry, flush
// the existing transaction and set the entry on a new one, otherwise,
// returns early.
if err != badger.ErrTxnTooBig {
return err
}
if err := rw.Flush(opts.StorageLimitInBytes); err != nil {
if err := rw.Flush(); err != nil {
carsonip marked this conversation as resolved.
Show resolved Hide resolved
return err
}
rw.pendingSize += entrySize
kruskall marked this conversation as resolved.
Show resolved Hide resolved
rw.s.pendingSize.Add(entrySize)
return rw.txn.SetEntry(e)
kruskall marked this conversation as resolved.
Show resolved Hide resolved
}

func estimateSize(e *badger.Entry) int64 {
// See badger WithValueThreshold option
// An storage usage of an entry depends on its size
//
// if len(e.Value) < threshold {
// return len(e.Key) + len(e.Value) + 2 // Meta, UserMeta
// }
// return len(e.Key) + 12 + 2 // 12 for ValuePointer, 2 for metas.
//
// Make a good estimate by reserving more space
estimate := len(e.Key) + len(e.Value) + 12 + 2
// Extra bytes for the version in key.
return int64(estimate) + 10
}

// DeleteTraceEvent deletes the trace event from storage.
func (rw *ReadWriter) DeleteTraceEvent(traceID, id string) error {
key := append(append([]byte(traceID), ':'), id...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func BenchmarkWriteTransaction(b *testing.B) {
b.Fatal(err)
}
}
assert.NoError(b, readWriter.Flush(wOpts.StorageLimitInBytes))
assert.NoError(b, readWriter.Flush())
}

type testCase struct {
Expand Down
13 changes: 3 additions & 10 deletions x-pack/apm-server/sampling/eventstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package eventstorage_test

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -77,7 +76,7 @@ func testWriteEvents(t *testing.T, numSpans int) {
)

// Flush in order for the writes to be visible to other readers.
assert.NoError(t, readWriter.Flush(wOpts.StorageLimitInBytes))
assert.NoError(t, readWriter.Flush())

var recorded modelpb.Batch
assert.NoError(t, db.View(func(txn *badger.Txn) error {
Expand Down Expand Up @@ -135,7 +134,7 @@ func TestWriteTraceSampled(t *testing.T) {
assert.True(t, isSampled)

// Flush in order for the writes to be visible to other readers.
assert.NoError(t, readWriter.Flush(wOpts.StorageLimitInBytes))
assert.NoError(t, readWriter.Flush())

sampled := make(map[string]bool)
assert.NoError(t, db.View(func(txn *badger.Txn) error {
Expand Down Expand Up @@ -287,7 +286,6 @@ func TestStorageLimit(t *testing.T) {
db := newBadgerDB(t, opts)
db.Close()
db = newBadgerDB(t, opts)
lsm, vlog := db.Size()

store := eventstorage.New(db, eventstorage.ProtobufCodec{})
readWriter := store.NewReadWriter()
Expand All @@ -298,13 +296,8 @@ func TestStorageLimit(t *testing.T) {
transaction := modelpb.APMEvent{Transaction: &modelpb.Transaction{Id: transactionID}}
err := readWriter.WriteTraceEvent(traceID, transactionID, &transaction, eventstorage.WriterOpts{
TTL: time.Minute,
StorageLimitInBytes: 1, // ignored in the write, because there's no implicit flush
StorageLimitInBytes: 1,
})
assert.NoError(t, err)
err = readWriter.Flush(1)
assert.EqualError(t, err, fmt.Sprintf(
"failed to flush pending writes: configured storage limit reached (current: %d, limit: 1)", lsm+vlog,
))
assert.ErrorIs(t, err, eventstorage.ErrLimitReached)

// Assert the stored write has been discarded.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,5 +640,5 @@ func (s *wrappedRW) DeleteTraceEvent(traceID, id string) error {

// Flush calls ShardedReadWriter.Flush
func (s *wrappedRW) Flush() error {
return s.rw.Flush(s.writerOpts.StorageLimitInBytes)
return s.rw.Flush()
}
22 changes: 12 additions & 10 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ func TestProcessAlreadyTailSampled(t *testing.T) {
StorageLimitInBytes: 0,
}
assert.NoError(t, writer.WriteTraceSampled(trace1.Id, true, wOpts))
assert.NoError(t, writer.Flush(wOpts.StorageLimitInBytes))
assert.NoError(t, writer.Flush())
writer.Close()

wOpts.TTL = -1 // expire immediately
storage = eventstorage.New(config.DB, eventstorage.ProtobufCodec{})
writer = storage.NewReadWriter()
assert.NoError(t, writer.WriteTraceSampled(trace2.Id, true, wOpts))
assert.NoError(t, writer.Flush(wOpts.StorageLimitInBytes))
assert.NoError(t, writer.Flush())
writer.Close()

// Badger transactions created globally before committing the above writes
// will not see them due to SSI (Serializable Snapshot Isolation). Flush
// the storage so that new transactions are created for the underlying
// writer shards that can list all the events committed so far.
require.NoError(t, config.Storage.Flush(0))
require.NoError(t, config.Storage.Flush())

processor, err := sampling.NewProcessor(config)
require.NoError(t, err)
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestProcessAlreadyTailSampled(t *testing.T) {

// Stop the processor and flush global storage so we can access the database.
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, config.Storage.Flush(0))
assert.NoError(t, config.Storage.Flush())
reader := storage.NewReadWriter()
defer reader.Close()

Expand Down Expand Up @@ -260,7 +260,7 @@ func TestProcessLocalTailSampling(t *testing.T) {

// Stop the processor and flush global storage so we can access the database.
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, config.Storage.Flush(0))
assert.NoError(t, config.Storage.Flush())
storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{})
reader := storage.NewReadWriter()
defer reader.Close()
Expand Down Expand Up @@ -325,7 +325,7 @@ func TestProcessLocalTailSamplingUnsampled(t *testing.T) {

// Stop the processor so we can access the database.
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, config.Storage.Flush(0))
assert.NoError(t, config.Storage.Flush())
storage := eventstorage.New(config.DB, eventstorage.ProtobufCodec{})
reader := storage.NewReadWriter()
defer reader.Close()
Expand Down Expand Up @@ -479,7 +479,7 @@ func TestProcessRemoteTailSampling(t *testing.T) {

// Stop the processor and flush global storage so we can access the database.
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, config.Storage.Flush(0))
assert.NoError(t, config.Storage.Flush())
assert.Empty(t, published) // remote decisions don't get republished

expectedMonitoring := monitoring.MakeFlatSnapshot()
Expand Down Expand Up @@ -704,8 +704,10 @@ func TestStorageLimit(t *testing.T) {
config := newTempdirConfig(t)
// Write 5K span events and close the DB to persist to disk the storage
// size and assert that none are reported immediately.
writeBatch(5000, config, func(b modelpb.Batch) { assert.Empty(t, b) })
assert.NoError(t, config.Storage.Flush(0))
writeBatch(5000, config, func(b modelpb.Batch) {
assert.Empty(t, b, fmt.Sprintf("expected empty but size is %d", len(b)))
})
assert.NoError(t, config.Storage.Flush())
config.Storage.Close()
assert.NoError(t, config.DB.Close())

Expand Down Expand Up @@ -787,7 +789,7 @@ func TestGracefulShutdown(t *testing.T) {
assert.NoError(t, processor.ProcessBatch(context.Background(), &batch))
assert.Empty(t, batch)
assert.NoError(t, processor.Stop(context.Background()))
assert.NoError(t, config.Storage.Flush(0))
assert.NoError(t, config.Storage.Flush())

reader := eventstorage.New(config.DB, eventstorage.ProtobufCodec{}).NewReadWriter()

Expand Down