From 4526014b950504331a1f976bac60a7c09c61cbf6 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 28 Sep 2021 15:18:47 -0400 Subject: [PATCH] Expose config options for search block encoding and page size (#990) * Expose config options for search block encoding and page size Signed-off-by: Martin Disibio * Change search encoding/pagesize defaults Signed-off-by: Martin Disibio * Fix merge Signed-off-by: Martin Disibio * Docs Signed-off-by: Martin Disibio * lint Signed-off-by: Martin Disibio --- docs/tempo/website/configuration/_index.md | 9 ++++ modules/ingester/instance.go | 4 +- modules/storage/config.go | 2 + tempodb/encoding/config.go | 2 + tempodb/search/backend_search_block.go | 34 +++++-------- tempodb/search/backend_search_block_test.go | 50 ++++++++++++++++--- tempodb/search/backend_search_block_writer.go | 6 +-- tempodb/search/block_meta.go | 16 ++---- tempodb/tempodb.go | 12 +++++ 9 files changed, 90 insertions(+), 45 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 7121c3dde6f..6a85fd24c31 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -549,6 +549,15 @@ storage: # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 [encoding: ] + + # search data encoding/compression. same options as blocks. + # (default: gzip) + [search_encoding: ] + + # number of bytes per search page + # (default: 1MiB) + [search_page_size_bytes: ] + ``` ## Memberlist diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index a12c5e334e0..2fb8c3d1b85 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -283,12 +283,10 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error { var newSearch search.SearchableBlock if oldSearch != nil { - err = search.NewBackendSearchBlock(oldSearch.b, i.local, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID, backend.EncSnappy, 0) + newSearch, err = i.writer.CompleteSearchBlockWithBackend(oldSearch.b, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID, i.localReader, i.localWriter) if err != nil { return err } - - newSearch = search.OpenBackendSearchBlock(i.local, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID) } i.blocksMtx.Lock() diff --git a/modules/storage/config.go b/modules/storage/config.go index c3584dd8556..2d827f180b6 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -42,6 +42,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd + cfg.Trace.Block.SearchEncoding = backend.EncGZIP + cfg.Trace.Block.SearchPageSizeBytes = 1024 * 1024 // 1 MB cfg.Trace.Azure = &azure.Config{} f.StringVar(&cfg.Trace.Azure.StorageAccountName.Value, util.PrefixConfig(prefix, "trace.azure.storage-account-name"), "", "Azure storage account name.") diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index 69fe00bc25e..dadd47a8f3a 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -13,6 +13,8 @@ type BlockConfig struct { BloomFP float64 `yaml:"bloom_filter_false_positive"` BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"` Encoding backend.Encoding `yaml:"encoding"` + SearchEncoding backend.Encoding `yaml:"search_encoding"` + SearchPageSizeBytes int `yaml:"search_page_size_bytes"` } // ValidateConfig returns true if the config is valid diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index 9269174fdea..d7f5926b78a 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -1,17 +1,14 @@ package search import ( - "bytes" "context" "io" "github.com/google/uuid" "github.com/pkg/errors" - tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/backend" - "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -23,13 +20,13 @@ const defaultBackendSearchBlockPageSize = 2 * 1024 * 1024 type BackendSearchBlock struct { id uuid.UUID tenantID string - l *local.Backend + r backend.Reader } // NewBackendSearchBlock iterates through the given WAL search data and writes it to the persistent backend // in a more efficient paged form. Multiple traces are written in the same page to make sure of the flatbuffer // CreateSharedString feature which dedupes strings across the entire buffer. -func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockID uuid.UUID, tenantID string, enc backend.Encoding, pageSizeBytes int) error { +func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, blockID uuid.UUID, tenantID string, enc backend.Encoding, pageSizeBytes int) error { var err error ctx := context.TODO() indexPageSize := 100 * 1024 @@ -47,7 +44,7 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI header := tempofb.NewSearchBlockHeaderMutable() - w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc) + w, err := newBackendSearchBlockWriter(blockID, tenantID, rw, version, enc) if err != nil { return err } @@ -109,14 +106,14 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI if err != nil { return err } - err = l.Write(ctx, "search-index", backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(indexBytes), int64(len(indexBytes)), true) + err = rw.Write(ctx, "search-index", blockID, tenantID, indexBytes, true) if err != nil { return err } // Write header hb := header.ToBytes() - err = l.Write(ctx, "search-header", backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(hb), int64(len(hb)), true) + err = rw.Write(ctx, "search-header", blockID, tenantID, hb, true) if err != nil { return err } @@ -128,15 +125,15 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI Version: version.Version(), Encoding: enc, } - return WriteSearchBlockMeta(ctx, l, blockID, tenantID, sm) + return WriteSearchBlockMeta(ctx, rw, blockID, tenantID, sm) } // OpenBackendSearchBlock opens the search data for an existing block in the given backend. -func OpenBackendSearchBlock(l *local.Backend, blockID uuid.UUID, tenantID string) *BackendSearchBlock { +func OpenBackendSearchBlock(blockID uuid.UUID, tenantID string, r backend.Reader) *BackendSearchBlock { return &BackendSearchBlock{ id: blockID, tenantID: tenantID, - l: l, + r: r, } } @@ -148,7 +145,7 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results indexBuf := []common.Record{{}} entry := &tempofb.SearchEntry{} // Buffer - meta, err := ReadSearchBlockMeta(ctx, s.l, s.id, s.tenantID) + meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID) if err != nil { return err } @@ -160,17 +157,12 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results // Read header // Verify something in the block matches by checking the header - hbr, hbrlen, err := s.l.Read(ctx, "search-header", backend.KeyPathForBlock(s.id, s.tenantID), true) + hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true) if err != nil { return err } - sr.bytesInspected.Add(uint64(hbrlen)) - - hb, err := tempo_io.ReadAllWithEstimate(hbr, hbrlen) - if err != nil { - return err - } + sr.bytesInspected.Add(uint64(len(hb))) header := tempofb.GetRootAsSearchBlockHeader(hb, 0) if !p.MatchesBlock(header) { @@ -183,14 +175,14 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results // Read index bmeta := backend.NewBlockMeta(s.tenantID, s.id, meta.Version, meta.Encoding, "") - cr := backend.NewContextReader(bmeta, "search-index", backend.NewReader(s.l), false) + cr := backend.NewContextReader(bmeta, "search-index", s.r, false) ir, err := vers.NewIndexReader(cr, int(meta.IndexPageSize), int(meta.IndexRecords)) if err != nil { return err } - dcr := backend.NewContextReader(bmeta, "search", backend.NewReader(s.l), false) + dcr := backend.NewContextReader(bmeta, "search", s.r, false) dr, err := vers.NewDataReader(dcr, meta.Encoding) if err != nil { return err diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 2756c45ab60..95a7b3e264d 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -19,6 +19,8 @@ import ( "github.com/stretchr/testify/require" ) +const testTenantID = "fake" + func genSearchData(traceID []byte, i int) [][]byte { return [][]byte{(&tempofb.SearchEntryMutable{ TraceID: traceID, @@ -46,11 +48,10 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E require.NoError(t, err) blockID := uuid.New() - tenantID := "fake" - err = NewBackendSearchBlock(b1, l, blockID, tenantID, enc, pageSizeBytes) + err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, pageSizeBytes) require.NoError(t, err) - b2 := OpenBackendSearchBlock(l, blockID, tenantID) + b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l)) return b2 } @@ -128,11 +129,10 @@ func TestBackendSearchBlockDedupesWAL(t *testing.T) { require.NoError(t, err) blockID := uuid.New() - tenantID := "fake" - err = NewBackendSearchBlock(b1, l, blockID, tenantID, backend.EncNone, 0) + err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, backend.EncNone, 0) require.NoError(t, err) - b2 := OpenBackendSearchBlock(l, blockID, tenantID) + b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l)) p := NewSearchPipeline(&tempopb.SearchRequest{ Tags: tc.searchTags, @@ -204,3 +204,41 @@ func BenchmarkBackendSearchBlockSearch(b *testing.B) { } } } + +func TestBackendSearchBlockFinalSize(t *testing.T) { + traceCount := 10000 + pageSizesMB := []float32{1} + + f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + require.NoError(t, err) + + b1, err := NewStreamingSearchBlockForFile(f) + require.NoError(t, err) + + for i := 0; i < traceCount; i++ { + id := make([]byte, 16) + binary.LittleEndian.PutUint32(id, uint32(i)) + require.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i))) + } + + l, err := local.NewBackend(&local.Config{ + Path: t.TempDir(), + }) + require.NoError(t, err) + + blockID := uuid.New() + + for _, enc := range backend.SupportedEncoding { + for _, sz := range pageSizesMB { + + err := NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, int(sz*1024*1024)) + require.NoError(t, err) + + _, len, err := l.Read(context.TODO(), "search", backend.KeyPathForBlock(blockID, testTenantID), false) + require.NoError(t, err) + + fmt.Printf("BackendSearchBlock/%s/%.1fMiB, %d traces = %d bytes, %.2f bytes per trace \n", enc.String(), sz, traceCount, len, float32(len)/float32(traceCount)) + + } + } +} diff --git a/tempodb/search/backend_search_block_writer.go b/tempodb/search/backend_search_block_writer.go index 927749994b3..e29a837bfbd 100644 --- a/tempodb/search/backend_search_block_writer.go +++ b/tempodb/search/backend_search_block_writer.go @@ -18,7 +18,7 @@ type backendSearchBlockWriter struct { // input blockID uuid.UUID tenantID string - w backend.RawWriter + w backend.Writer // vars builder *tempofb.SearchPageBuilder @@ -30,7 +30,7 @@ type backendSearchBlockWriter struct { var _ common.DataWriterGeneric = (*backendSearchBlockWriter)(nil) -func newBackendSearchBlockWriter(blockID uuid.UUID, tenantID string, w backend.RawWriter, v encoding.VersionedEncoding, enc backend.Encoding) (*backendSearchBlockWriter, error) { +func newBackendSearchBlockWriter(blockID uuid.UUID, tenantID string, w backend.Writer, v encoding.VersionedEncoding, enc backend.Encoding) (*backendSearchBlockWriter, error) { finalBuf := &bytes.Buffer{} dw, err := v.NewDataWriter(finalBuf, enc) @@ -78,7 +78,7 @@ func (w *backendSearchBlockWriter) CutPage(ctx context.Context) (int, error) { w.pageBuf = w.finalBuf.Bytes() // Append to backend - w.tracker, err = w.w.Append(ctx, "search", backend.KeyPathForBlock(w.blockID, w.tenantID), w.tracker, w.pageBuf) + w.tracker, err = w.w.Append(ctx, "search", w.blockID, w.tenantID, w.tracker, w.pageBuf) if err != nil { return 0, err } diff --git a/tempodb/search/block_meta.go b/tempodb/search/block_meta.go index e89db64a12a..e153d7baecb 100644 --- a/tempodb/search/block_meta.go +++ b/tempodb/search/block_meta.go @@ -1,12 +1,10 @@ package search import ( - "bytes" "context" "encoding/json" "github.com/google/uuid" - tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/tempodb/backend" ) @@ -19,24 +17,18 @@ type BlockMeta struct { const searchMetaObjectName = "search.meta.json" -func WriteSearchBlockMeta(ctx context.Context, w backend.RawWriter, blockID uuid.UUID, tenantID string, sm *BlockMeta) error { +func WriteSearchBlockMeta(ctx context.Context, w backend.Writer, blockID uuid.UUID, tenantID string, sm *BlockMeta) error { metaBytes, err := json.Marshal(sm) if err != nil { return err } - err = w.Write(ctx, searchMetaObjectName, backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(metaBytes), int64(len(metaBytes)), false) + err = w.Write(ctx, searchMetaObjectName, blockID, tenantID, metaBytes, false) return err } -func ReadSearchBlockMeta(ctx context.Context, r backend.RawReader, blockID uuid.UUID, tenantID string) (*BlockMeta, error) { - metaReader, size, err := r.Read(ctx, searchMetaObjectName, backend.KeyPathForBlock(blockID, tenantID), false) - if err != nil { - return nil, err - } - - defer metaReader.Close() - metaBytes, err := tempo_io.ReadAllWithEstimate(metaReader, size) +func ReadSearchBlockMeta(ctx context.Context, r backend.Reader, blockID uuid.UUID, tenantID string) (*BlockMeta, error) { + metaBytes, err := r.Read(ctx, searchMetaObjectName, blockID, tenantID, false) if err != nil { return nil, err } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 9be627f42dd..b2f0d584d9e 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/pool" + "github.com/grafana/tempo/tempodb/search" "github.com/grafana/tempo/tempodb/wal" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" @@ -69,6 +70,7 @@ type Writer interface { WriteBlock(ctx context.Context, block WriteableBlock) error CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.BackendBlock, error) CompleteBlockWithBackend(ctx context.Context, block *wal.AppendBlock, combiner common.ObjectCombiner, r backend.Reader, w backend.Writer) (*encoding.BackendBlock, error) + CompleteSearchBlockWithBackend(block *search.StreamingSearchBlock, blockID uuid.UUID, tenantID string, r backend.Reader, w backend.Writer) (*search.BackendSearchBlock, error) WAL() *wal.WAL } @@ -260,6 +262,16 @@ func (rw *readerWriter) CompleteBlockWithBackend(ctx context.Context, block *wal return backendBlock, nil } +func (rw *readerWriter) CompleteSearchBlockWithBackend(block *search.StreamingSearchBlock, blockID uuid.UUID, tenantID string, r backend.Reader, w backend.Writer) (*search.BackendSearchBlock, error) { + err := search.NewBackendSearchBlock(block, w, blockID, tenantID, rw.cfg.Block.SearchEncoding, rw.cfg.Block.SearchPageSizeBytes) + if err != nil { + return nil, err + } + + b := search.OpenBackendSearchBlock(blockID, tenantID, r) + return b, nil +} + func (rw *readerWriter) WAL() *wal.WAL { return rw.wal }