Skip to content

Commit

Permalink
Add new compaction max_block_bytes setting (grafana#520)
Browse files Browse the repository at this point in the history
* Rewrite timeWindowBlockSelector

* Add new size limit for compacted blocks:

* Deprecate max_compaction_objects. Update examples

* Update changelog

* MaxBlockSize -> MaxBlockBytes
  • Loading branch information
mdisibio authored Feb 10, 2021
1 parent aa88d27 commit 9264916
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 313 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461)
* [CHANGE] Ingester cut blocks based on size instead of trace count. Replace ingester `traces_per_block` setting with `max_block_bytes`. This is a **breaking change**. [#474](https://github.com/grafana/tempo/issues/474)
* [CHANGE] Refactor cache section in tempodb. This is a **breaking change** b/c the cache config section has changed. [#485](https://github.com/grafana/tempo/pull/485)
* [CHANGE] New compactor setting for max block size data instead of traces. [#520](https://github.com/grafana/tempo/pull/520)
* [FEATURE] Added block compression. This is a **breaking change** b/c some configuration fields moved. [#504](https://github.com/grafana/tempo/pull/504)
* [ENHANCEMENT] Serve config at the "/config" endpoint. [#446](https://github.com/grafana/tempo/pull/446)
* [ENHANCEMENT] Switch blocklist polling and retention to different concurrency mechanism, add configuration options. [#475](https://github.com/grafana/tempo/issues/475)
Expand Down
13 changes: 7 additions & 6 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ Compactors stream blocks from the storage backend, combine them and write them b
```
compactor:
compaction:
block_retention: 336h # duration to keep blocks
compacted_block_retention: 1h # duration to keep blocks that have been compacted elsewhere
compaction_window: 1h # blocks in this time window will be compacted together
chunk_size_bytes: 10485760 # amount of data to buffer from input blocks
flush_size_bytes: 31457280 # flush data to backend when buffer is this large
max_compaction_objects: 1000000 # maximum traces in a compacted block
block_retention: 336h # Optional. Duration to keep blocks. Default is 14 days (336h).
compacted_block_retention: 1h # Optional. Duration to keep blocks that have been compacted elsewhere
compaction_window: 4h # Optional. Blocks in this time window will be compacted together
chunk_size_bytes: 10485760 # Optional. Amount of data to buffer from input blocks. Default is 10 MiB
flush_size_bytes: 31457280 # Optional. Flush data to backend when buffer is this large. Default is 30 MiB
max_compaction_objects: 6000000 # Optional. Maximum number of traces in a compacted block. Default is 6 million. Deprecated.
max_block_bytes: 107374182400 # Optional. Maximum size of a compacted block in bytes. Default is 100 GiB
retention_concurrency: 10 # Optional. Number of tenants to process in parallel during retention. Default is 10.
ring:
kvstore:
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-azure.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-gcs-fake.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m
flush_size_bytes: 5242880
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m

Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose/etc/tempo-s3-minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ ingester:
compactor:
compaction:
compaction_window: 1h # blocks in this time window will be compacted together
max_compaction_objects: 1000000 # maximum size of compacted blocks
max_block_bytes: 100_000_000 # maximum size of compacted blocks
block_retention: 1h
compacted_block_retention: 10m
flush_size_bytes: 5242880
Expand Down
1 change: 1 addition & 0 deletions modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

f.DurationVar(&cfg.Compactor.BlockRetention, util.PrefixConfig(prefix, "compaction.block-retention"), 14*24*time.Hour, "Duration to keep blocks/traces.")
f.IntVar(&cfg.Compactor.MaxCompactionObjects, util.PrefixConfig(prefix, "compaction.max-objects-per-block"), 6000000, "Maximum number of traces in a compacted block.")
f.Uint64Var(&cfg.Compactor.MaxBlockBytes, util.PrefixConfig(prefix, "compaction.max-block-bytes"), 100*1024*1024*1024 /* 100GB */, "Maximum size of a compacted block.")
f.DurationVar(&cfg.Compactor.MaxCompactionRange, util.PrefixConfig(prefix, "compaction.compaction-window"), 4*time.Hour, "Maximum time window across which to compact blocks.")
cfg.OverrideRingKey = ring.CompactorRingKey
}
207 changes: 94 additions & 113 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,168 +58,149 @@ func (sbs *simpleBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string)
// It needs to be reinitialized with updated blocklist.

type timeWindowBlockSelector struct {
blocklist []*backend.BlockMeta
MinInputBlocks int
MaxInputBlocks int
MaxCompactionRange time.Duration // Size of the time window - say 6 hours
MaxCompactionObjects int // maximum size of compacted objects
MaxBlockBytes uint64 // maximum block size, estimate

entries []timeWindowBlockEntry
}

type timeWindowBlockEntry struct {
meta *backend.BlockMeta
group string // Blocks in the same group will be compacted together. Sort order also determines group priority.
order string // Individual block priority within the group.
hash string // Hash string used for sharding ownership, preserves backwards compatibility
}

var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)

func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, maxBlockBytes uint64, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
blocklist: append([]*backend.BlockMeta(nil), blocklist...),
MinInputBlocks: minInputBlocks,
MaxInputBlocks: maxInputBlocks,
MaxCompactionRange: maxCompactionRange,
MaxCompactionObjects: maxCompactionObjects,
MaxBlockBytes: maxBlockBytes,
}

activeWindow := twbs.windowForTime(time.Now().Add(-activeWindowDuration))
now := time.Now()
currWindow := twbs.windowForTime(now)
activeWindow := twbs.windowForTime(now.Add(-activeWindowDuration))

for _, b := range blocklist {
w := twbs.windowForBlock(b)

// exclude blocks that fall in last window from active -> inactive cut-over
// blocks in this window will not be compacted in order to avoid
// ownership conflicts where two compactors process the same block
// at the same time as it transitions from last active window to first inactive window.
var newBlocks []*backend.BlockMeta
for _, b := range twbs.blocklist {
if twbs.windowForBlock(b) != activeWindow {
newBlocks = append(newBlocks, b)
// exclude blocks that fall in last window from active -> inactive cut-over
// blocks in this window will not be compacted in order to avoid
// ownership conflicts where two compactors process the same block
// at the same time as it transitions from last active window to first inactive window.
if w == activeWindow {
continue
}
}
twbs.blocklist = newBlocks

// sort by compaction window, level, and then size
sort.Slice(twbs.blocklist, func(i, j int) bool {
bi := twbs.blocklist[i]
bj := twbs.blocklist[j]
entry := timeWindowBlockEntry{
meta: b,
}

wi := twbs.windowForBlock(bi)
wj := twbs.windowForBlock(bj)
age := currWindow - w
if activeWindow <= w {
// inside active window.
// Group by compaction level and window.
// Choose lowest compaction level and most recent windows first.
entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age)

if activeWindow <= wi && activeWindow <= wj {
// inside active window. sort by: compaction lvl -> window -> size
// we should always choose the smallest two blocks whos compaction lvl and windows match
if bi.CompactionLevel != bj.CompactionLevel {
return bi.CompactionLevel < bj.CompactionLevel
}
// Within group choose smallest blocks first.
entry.order = fmt.Sprintf("%016X", entry.meta.TotalObjects)

if wi != wj {
return wi > wj
}
entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w)
} else {
// outside active window. sort by: window -> compaction lvl -> size
// we should always choose the most recent two blocks that can be compacted
if wi != wj {
return wi > wj
}
// outside active window.
// Group by window only. Choose most recent windows first.
entry.group = fmt.Sprintf("B-%016X", age)

if bi.CompactionLevel != bj.CompactionLevel {
return bi.CompactionLevel < bj.CompactionLevel
}
// Within group chose lowest compaction lvl and smallest blocks first.
entry.order = fmt.Sprintf("%v-%016X", b.CompactionLevel, entry.meta.TotalObjects)

entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w)
}

return bi.TotalObjects < bj.TotalObjects
twbs.entries = append(twbs.entries, entry)
}

// sort by group then order
sort.SliceStable(twbs.entries, func(i, j int) bool {
ei := twbs.entries[i]
ej := twbs.entries[j]

if ei.group == ej.group {
return ei.order < ej.order
}
return ei.group < ej.group
})

return twbs
}

func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*backend.BlockMeta, string) {
for len(twbs.blocklist) > 0 {
// find everything from cursor forward that belongs to this block
cursor := 0
currentWindow := twbs.windowForBlock(twbs.blocklist[cursor])

windowBlocks := make([]*backend.BlockMeta, 0)
for cursor < len(twbs.blocklist) {
currentBlock := twbs.blocklist[cursor]

if currentWindow != twbs.windowForBlock(currentBlock) {
for len(twbs.entries) > 0 {
var chosen []timeWindowBlockEntry

// find everything from cursor forward that belongs to this group
// Gather contiguous blocks while staying within limits
i := 0
for ; i < len(twbs.entries); i++ {
for j := i + 1; j < len(twbs.entries); j++ {
stripe := twbs.entries[i : j+1]
if twbs.entries[i].group == twbs.entries[j].group &&
len(stripe) <= twbs.MaxInputBlocks &&
totalObjects(stripe) <= twbs.MaxCompactionObjects &&
totalSize(stripe) <= twbs.MaxBlockBytes {
chosen = stripe
} else {
break
}
}
if len(chosen) > 0 {
// Found a stripe of blocks
break
}
cursor++

windowBlocks = append(windowBlocks, currentBlock)
}

// did we find enough blocks?
if len(windowBlocks) >= twbs.MinInputBlocks {
var compactBlocks []*backend.BlockMeta

// blocks in the currently active window
// dangerous to use time.Now()
activeWindow := twbs.windowForTime(time.Now().Add(-activeWindowDuration))
blockWindow := twbs.windowForBlock(windowBlocks[0])

hashString := fmt.Sprintf("%v", windowBlocks[0].TenantID)
compact := true

// the active window should be compacted by level
if activeWindow <= blockWindow {
// search forward for inputBlocks in a row that have the same compaction level
// Gather as many as possible while staying within limits
for i := 0; i <= len(windowBlocks)-twbs.MinInputBlocks+1; i++ {
for j := i + 1; j <= len(windowBlocks)-1 &&
windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel &&
len(compactBlocks)+1 <= twbs.MaxInputBlocks &&
totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ {
compactBlocks = windowBlocks[i : j+1]
}
if len(compactBlocks) > 0 {
// Found a stripe of blocks
break
}
}
// Remove entries that were checked so they are not considered again.
twbs.entries = twbs.entries[i+len(chosen):]

compact = false
if len(compactBlocks) >= twbs.MinInputBlocks {
compact = true
hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow)
}
} else { // all other windows will be compacted using their two smallest blocks
compactBlocks = windowBlocks[:twbs.MinInputBlocks]
hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow)
}
// did we find enough blocks?
if len(chosen) >= twbs.MinInputBlocks {

if totalObjects(compactBlocks) > twbs.MaxCompactionObjects {
compact = false
compactBlocks := make([]*backend.BlockMeta, 0)
for _, e := range chosen {
compactBlocks = append(compactBlocks, e.meta)
}

if compact {
// remove the blocks we are returning so we don't consider them again
// this is horribly inefficient as it's written
for _, blockToCompact := range compactBlocks {
for i, block := range twbs.blocklist {
if block == blockToCompact {
copy(twbs.blocklist[i:], twbs.blocklist[i+1:])
twbs.blocklist[len(twbs.blocklist)-1] = nil
twbs.blocklist = twbs.blocklist[:len(twbs.blocklist)-1]

break
}
}
}

return compactBlocks, hashString
}
return compactBlocks, chosen[0].hash
}

// otherwise update the blocklist
twbs.blocklist = twbs.blocklist[cursor:]
}
return nil, ""
}

func totalObjects(blocks []*backend.BlockMeta) int {
func totalObjects(entries []timeWindowBlockEntry) int {
totalObjects := 0
for _, b := range blocks {
totalObjects += b.TotalObjects
for _, b := range entries {
totalObjects += b.meta.TotalObjects
}
return totalObjects
}

func totalSize(entries []timeWindowBlockEntry) uint64 {
sz := uint64(0)
for _, b := range entries {
sz += b.meta.Size
}
return sz
}

func (twbs *timeWindowBlockSelector) windowForBlock(meta *backend.BlockMeta) int64 {
return twbs.windowForTime(meta.EndTime)
}
Expand Down
Loading

0 comments on commit 9264916

Please sign in to comment.