Skip to content

Commit

Permalink
Expose config options for search block encoding and page size (grafan…
Browse files Browse the repository at this point in the history
…a#990)

* Expose config options for search block encoding and page size

Signed-off-by: Martin Disibio <[email protected]>

* Change search encoding/pagesize defaults

Signed-off-by: Martin Disibio <[email protected]>

* Fix merge

Signed-off-by: Martin Disibio <[email protected]>

* Docs

Signed-off-by: Martin Disibio <[email protected]>

* lint

Signed-off-by: Martin Disibio <[email protected]>
  • Loading branch information
mdisibio authored Sep 28, 2021
1 parent b2518be commit 4526014
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 45 deletions.
9 changes: 9 additions & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,15 @@ storage:
# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
[encoding: <string>]
# search data encoding/compression. same options as blocks.
# (default: gzip)
[search_encoding: <string>]
# number of bytes per search page
# (default: 1MiB)
[search_page_size_bytes: <int>]
```

## Memberlist
Expand Down
4 changes: 1 addition & 3 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,10 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error {

var newSearch search.SearchableBlock
if oldSearch != nil {
err = search.NewBackendSearchBlock(oldSearch.b, i.local, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID, backend.EncSnappy, 0)
newSearch, err = i.writer.CompleteSearchBlockWithBackend(oldSearch.b, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID, i.localReader, i.localWriter)
if err != nil {
return err
}

newSearch = search.OpenBackendSearchBlock(i.local, backendBlock.BlockMeta().BlockID, backendBlock.BlockMeta().TenantID)
}

i.blocksMtx.Lock()
Expand Down
2 changes: 2 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.")
cfg.Trace.Block.Encoding = backend.EncZstd
cfg.Trace.Block.SearchEncoding = backend.EncGZIP
cfg.Trace.Block.SearchPageSizeBytes = 1024 * 1024 // 1 MB

cfg.Trace.Azure = &azure.Config{}
f.StringVar(&cfg.Trace.Azure.StorageAccountName.Value, util.PrefixConfig(prefix, "trace.azure.storage-account-name"), "", "Azure storage account name.")
Expand Down
2 changes: 2 additions & 0 deletions tempodb/encoding/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type BlockConfig struct {
BloomFP float64 `yaml:"bloom_filter_false_positive"`
BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"`
Encoding backend.Encoding `yaml:"encoding"`
SearchEncoding backend.Encoding `yaml:"search_encoding"`
SearchPageSizeBytes int `yaml:"search_page_size_bytes"`
}

// ValidateConfig returns true if the config is valid
Expand Down
34 changes: 13 additions & 21 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package search

import (
"bytes"
"context"
"io"

"github.com/google/uuid"
"github.com/pkg/errors"

tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)
Expand All @@ -23,13 +20,13 @@ const defaultBackendSearchBlockPageSize = 2 * 1024 * 1024
type BackendSearchBlock struct {
id uuid.UUID
tenantID string
l *local.Backend
r backend.Reader
}

// NewBackendSearchBlock iterates through the given WAL search data and writes it to the persistent backend
// in a more efficient paged form. Multiple traces are written in the same page to make sure of the flatbuffer
// CreateSharedString feature which dedupes strings across the entire buffer.
func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockID uuid.UUID, tenantID string, enc backend.Encoding, pageSizeBytes int) error {
func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, blockID uuid.UUID, tenantID string, enc backend.Encoding, pageSizeBytes int) error {
var err error
ctx := context.TODO()
indexPageSize := 100 * 1024
Expand All @@ -47,7 +44,7 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI

header := tempofb.NewSearchBlockHeaderMutable()

w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc)
w, err := newBackendSearchBlockWriter(blockID, tenantID, rw, version, enc)
if err != nil {
return err
}
Expand Down Expand Up @@ -109,14 +106,14 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
if err != nil {
return err
}
err = l.Write(ctx, "search-index", backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(indexBytes), int64(len(indexBytes)), true)
err = rw.Write(ctx, "search-index", blockID, tenantID, indexBytes, true)
if err != nil {
return err
}

// Write header
hb := header.ToBytes()
err = l.Write(ctx, "search-header", backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(hb), int64(len(hb)), true)
err = rw.Write(ctx, "search-header", blockID, tenantID, hb, true)
if err != nil {
return err
}
Expand All @@ -128,15 +125,15 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI
Version: version.Version(),
Encoding: enc,
}
return WriteSearchBlockMeta(ctx, l, blockID, tenantID, sm)
return WriteSearchBlockMeta(ctx, rw, blockID, tenantID, sm)
}

// OpenBackendSearchBlock opens the search data for an existing block in the given backend.
func OpenBackendSearchBlock(l *local.Backend, blockID uuid.UUID, tenantID string) *BackendSearchBlock {
func OpenBackendSearchBlock(blockID uuid.UUID, tenantID string, r backend.Reader) *BackendSearchBlock {
return &BackendSearchBlock{
id: blockID,
tenantID: tenantID,
l: l,
r: r,
}
}

Expand All @@ -148,7 +145,7 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results
indexBuf := []common.Record{{}}
entry := &tempofb.SearchEntry{} // Buffer

meta, err := ReadSearchBlockMeta(ctx, s.l, s.id, s.tenantID)
meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID)
if err != nil {
return err
}
Expand All @@ -160,17 +157,12 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

// Read header
// Verify something in the block matches by checking the header
hbr, hbrlen, err := s.l.Read(ctx, "search-header", backend.KeyPathForBlock(s.id, s.tenantID), true)
hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true)
if err != nil {
return err
}

sr.bytesInspected.Add(uint64(hbrlen))

hb, err := tempo_io.ReadAllWithEstimate(hbr, hbrlen)
if err != nil {
return err
}
sr.bytesInspected.Add(uint64(len(hb)))

header := tempofb.GetRootAsSearchBlockHeader(hb, 0)
if !p.MatchesBlock(header) {
Expand All @@ -183,14 +175,14 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

// Read index
bmeta := backend.NewBlockMeta(s.tenantID, s.id, meta.Version, meta.Encoding, "")
cr := backend.NewContextReader(bmeta, "search-index", backend.NewReader(s.l), false)
cr := backend.NewContextReader(bmeta, "search-index", s.r, false)

ir, err := vers.NewIndexReader(cr, int(meta.IndexPageSize), int(meta.IndexRecords))
if err != nil {
return err
}

dcr := backend.NewContextReader(bmeta, "search", backend.NewReader(s.l), false)
dcr := backend.NewContextReader(bmeta, "search", s.r, false)
dr, err := vers.NewDataReader(dcr, meta.Encoding)
if err != nil {
return err
Expand Down
50 changes: 44 additions & 6 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/stretchr/testify/require"
)

const testTenantID = "fake"

func genSearchData(traceID []byte, i int) [][]byte {
return [][]byte{(&tempofb.SearchEntryMutable{
TraceID: traceID,
Expand Down Expand Up @@ -46,11 +48,10 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E
require.NoError(t, err)

blockID := uuid.New()
tenantID := "fake"
err = NewBackendSearchBlock(b1, l, blockID, tenantID, enc, pageSizeBytes)
err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, pageSizeBytes)
require.NoError(t, err)

b2 := OpenBackendSearchBlock(l, blockID, tenantID)
b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
return b2
}

Expand Down Expand Up @@ -128,11 +129,10 @@ func TestBackendSearchBlockDedupesWAL(t *testing.T) {
require.NoError(t, err)

blockID := uuid.New()
tenantID := "fake"
err = NewBackendSearchBlock(b1, l, blockID, tenantID, backend.EncNone, 0)
err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, backend.EncNone, 0)
require.NoError(t, err)

b2 := OpenBackendSearchBlock(l, blockID, tenantID)
b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))

p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: tc.searchTags,
Expand Down Expand Up @@ -204,3 +204,41 @@ func BenchmarkBackendSearchBlockSearch(b *testing.B) {
}
}
}

func TestBackendSearchBlockFinalSize(t *testing.T) {
traceCount := 10000
pageSizesMB := []float32{1}

f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644)
require.NoError(t, err)

b1, err := NewStreamingSearchBlockForFile(f)
require.NoError(t, err)

for i := 0; i < traceCount; i++ {
id := make([]byte, 16)
binary.LittleEndian.PutUint32(id, uint32(i))
require.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i)))
}

l, err := local.NewBackend(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

blockID := uuid.New()

for _, enc := range backend.SupportedEncoding {
for _, sz := range pageSizesMB {

err := NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, int(sz*1024*1024))
require.NoError(t, err)

_, len, err := l.Read(context.TODO(), "search", backend.KeyPathForBlock(blockID, testTenantID), false)
require.NoError(t, err)

fmt.Printf("BackendSearchBlock/%s/%.1fMiB, %d traces = %d bytes, %.2f bytes per trace \n", enc.String(), sz, traceCount, len, float32(len)/float32(traceCount))

}
}
}
6 changes: 3 additions & 3 deletions tempodb/search/backend_search_block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type backendSearchBlockWriter struct {
// input
blockID uuid.UUID
tenantID string
w backend.RawWriter
w backend.Writer

// vars
builder *tempofb.SearchPageBuilder
Expand All @@ -30,7 +30,7 @@ type backendSearchBlockWriter struct {

var _ common.DataWriterGeneric = (*backendSearchBlockWriter)(nil)

func newBackendSearchBlockWriter(blockID uuid.UUID, tenantID string, w backend.RawWriter, v encoding.VersionedEncoding, enc backend.Encoding) (*backendSearchBlockWriter, error) {
func newBackendSearchBlockWriter(blockID uuid.UUID, tenantID string, w backend.Writer, v encoding.VersionedEncoding, enc backend.Encoding) (*backendSearchBlockWriter, error) {
finalBuf := &bytes.Buffer{}

dw, err := v.NewDataWriter(finalBuf, enc)
Expand Down Expand Up @@ -78,7 +78,7 @@ func (w *backendSearchBlockWriter) CutPage(ctx context.Context) (int, error) {
w.pageBuf = w.finalBuf.Bytes()

// Append to backend
w.tracker, err = w.w.Append(ctx, "search", backend.KeyPathForBlock(w.blockID, w.tenantID), w.tracker, w.pageBuf)
w.tracker, err = w.w.Append(ctx, "search", w.blockID, w.tenantID, w.tracker, w.pageBuf)
if err != nil {
return 0, err
}
Expand Down
16 changes: 4 additions & 12 deletions tempodb/search/block_meta.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package search

import (
"bytes"
"context"
"encoding/json"

"github.com/google/uuid"
tempo_io "github.com/grafana/tempo/pkg/io"
"github.com/grafana/tempo/tempodb/backend"
)

Expand All @@ -19,24 +17,18 @@ type BlockMeta struct {

const searchMetaObjectName = "search.meta.json"

func WriteSearchBlockMeta(ctx context.Context, w backend.RawWriter, blockID uuid.UUID, tenantID string, sm *BlockMeta) error {
func WriteSearchBlockMeta(ctx context.Context, w backend.Writer, blockID uuid.UUID, tenantID string, sm *BlockMeta) error {
metaBytes, err := json.Marshal(sm)
if err != nil {
return err
}

err = w.Write(ctx, searchMetaObjectName, backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(metaBytes), int64(len(metaBytes)), false)
err = w.Write(ctx, searchMetaObjectName, blockID, tenantID, metaBytes, false)
return err
}

func ReadSearchBlockMeta(ctx context.Context, r backend.RawReader, blockID uuid.UUID, tenantID string) (*BlockMeta, error) {
metaReader, size, err := r.Read(ctx, searchMetaObjectName, backend.KeyPathForBlock(blockID, tenantID), false)
if err != nil {
return nil, err
}

defer metaReader.Close()
metaBytes, err := tempo_io.ReadAllWithEstimate(metaReader, size)
func ReadSearchBlockMeta(ctx context.Context, r backend.Reader, blockID uuid.UUID, tenantID string) (*BlockMeta, error) {
metaBytes, err := r.Read(ctx, searchMetaObjectName, blockID, tenantID, false)
if err != nil {
return nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/search"
"github.com/grafana/tempo/tempodb/wal"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
Expand Down Expand Up @@ -69,6 +70,7 @@ type Writer interface {
WriteBlock(ctx context.Context, block WriteableBlock) error
CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.BackendBlock, error)
CompleteBlockWithBackend(ctx context.Context, block *wal.AppendBlock, combiner common.ObjectCombiner, r backend.Reader, w backend.Writer) (*encoding.BackendBlock, error)
CompleteSearchBlockWithBackend(block *search.StreamingSearchBlock, blockID uuid.UUID, tenantID string, r backend.Reader, w backend.Writer) (*search.BackendSearchBlock, error)
WAL() *wal.WAL
}

Expand Down Expand Up @@ -260,6 +262,16 @@ func (rw *readerWriter) CompleteBlockWithBackend(ctx context.Context, block *wal
return backendBlock, nil
}

func (rw *readerWriter) CompleteSearchBlockWithBackend(block *search.StreamingSearchBlock, blockID uuid.UUID, tenantID string, r backend.Reader, w backend.Writer) (*search.BackendSearchBlock, error) {
err := search.NewBackendSearchBlock(block, w, blockID, tenantID, rw.cfg.Block.SearchEncoding, rw.cfg.Block.SearchPageSizeBytes)
if err != nil {
return nil, err
}

b := search.OpenBackendSearchBlock(blockID, tenantID, r)
return b, nil
}

func (rw *readerWriter) WAL() *wal.WAL {
return rw.wal
}
Expand Down

0 comments on commit 4526014

Please sign in to comment.