Skip to content

Commit

Permalink
Add per-tenant compaction window (grafana#3129)
Browse files Browse the repository at this point in the history
* Add support for per-tenant compaction window

* Update legacy round trip

* Metric the override config value

* Drop comment

* Use default config window in case no override for tenant

* Add docs for compaction_window

* Add test for round-trip

* Drop metric for default overrides compaction window

* Update changelog
  • Loading branch information
zalegrala authored Nov 14, 2023
1 parent c075e7d commit 5e70235
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
* [CHANGE] Make vParquet3 the default block encoding [#2526](https://github.com/grafana/tempo/pull/3134) (@stoewer)
* [FEATURE] Introduce list_blocks_concurrency on GCS and S3 backends to control backend load and performance. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [FEATURE] Add per-tenant compaction window [#3129](https://github.com/grafana/tempo/pull/3129) (@zalegrala)
* [BUGFIX] Include statusMessage intrinsic attribute in tag search. [#3084](https://github.com/grafana/tempo/pull/3084) (@rcrowe)
* [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)

Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,9 @@ overrides:
# Per-user block retention. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[block_retention: <duration> | default = 0s]
# Per-user compaction window. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[compaction_window: <duration> | default = 0s]

# Metrics-generator related overrides
metrics_generator:
Expand Down
4 changes: 4 additions & 0 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int {
return c.overrides.MaxBytesPerTrace(tenantID)
}

func (c *Compactor) MaxCompactionRangeForTenant(tenantID string) time.Duration {
return c.overrides.MaxCompactionRange(tenantID)
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
Expand Down
7 changes: 5 additions & 2 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (

"github.com/grafana/tempo/tempodb/backend"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/pkg/sharedconfig"
filterconfig "github.com/grafana/tempo/pkg/spanfilter/config"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
MetricCompactionWindow = "compaction_window"
MetricMetricsGeneratorMaxActiveSeries = "metrics_generator_max_active_series"
MetricsGeneratorDryRunEnabled = "metrics_generator_dry_run_enabled"
)
Expand Down Expand Up @@ -129,7 +131,8 @@ type ReadOverrides struct {

type CompactionOverrides struct {
// Compactor enforced overrides.
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
CompactionWindow model.Duration `yaml:"compaction_window,omitempty" json:"compaction_window,omitempty"`
}

type GlobalOverrides struct {
Expand Down
9 changes: 6 additions & 3 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (c *Overrides) toLegacy() LegacyOverrides {
MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout: c.MetricsGenerator.Processor.LocalBlocks.CompleteBlockTimeout,
MetricsGeneratorIngestionSlack: c.MetricsGenerator.IngestionSlack,

BlockRetention: c.Compaction.BlockRetention,
BlockRetention: c.Compaction.BlockRetention,
CompactionWindow: c.Compaction.CompactionWindow,

MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery,
Expand Down Expand Up @@ -103,7 +104,8 @@ type LegacyOverrides struct {
MetricsGeneratorIngestionSlack time.Duration `yaml:"metrics_generator_ingestion_time_range_slack" json:"metrics_generator_ingestion_time_range_slack"`

// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
CompactionWindow model.Duration `yaml:"compaction_window" json:"compaction_window"`

// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
Expand Down Expand Up @@ -135,7 +137,8 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
MaxSearchDuration: l.MaxSearchDuration,
},
Compaction: CompactionOverrides{
BlockRetention: l.BlockRetention,
BlockRetention: l.BlockRetention,
CompactionWindow: l.CompactionWindow,
},
MetricsGenerator: MetricsGeneratorOverrides{
RingSize: l.MetricsGeneratorRingSize,
Expand Down
2 changes: 2 additions & 0 deletions modules/overrides/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ max_global_traces_per_user: 1000
max_bytes_per_trace: 100_000
block_retention: 24h
compaction_window: 4h
per_tenant_override_config: /etc/Overrides.yaml
per_tenant_override_period: 1m
Expand All @@ -65,6 +66,7 @@ max_search_duration: 5m
"max_bytes_per_trace": 100000,
"block_retention": "24h",
"compaction_window": "4h",
"per_tenant_override_config": "/etc/Overrides.yaml",
"per_tenant_override_period": "1m",
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Interface interface {
MaxLocalTracesPerUser(userID string) int
MaxGlobalTracesPerUser(userID string) int
MaxBytesPerTrace(userID string) int
MaxCompactionRange(userID string) time.Duration
Forwarders(userID string) []string
MaxBytesPerTagValuesQuery(userID string) int
MaxBlocksPerTagValuesQuery(userID string) int
Expand Down
5 changes: 5 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (o *runtimeConfigOverridesManager) MaxGlobalTracesPerUser(userID string) in
return o.getOverridesForUser(userID).Ingestion.MaxGlobalTracesPerUser
}

// MaxCompactionRange returns the maximum compaction window for this tenant.
func (o *runtimeConfigOverridesManager) MaxCompactionRange(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).Compaction.CompactionWindow)
}

// IngestionRateLimitBytes is the number of spans per second allowed for this tenant.
func (o *runtimeConfigOverridesManager) IngestionRateLimitBytes(userID string) float64 {
return float64(o.getOverridesForUser(userID).Ingestion.RateLimitBytes)
Expand Down
7 changes: 6 additions & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// Get the meta file of all non-compacted blocks for the given tenant
blocklist := rw.blocklist.Metas(tenantID)

window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID)
if window == 0 {
window = rw.compactorCfg.MaxCompactionRange
}

// Select which blocks to compact.
//
// Blocks are firstly divided by the active compaction window (default: most recent 24h)
Expand All @@ -115,7 +120,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level.
// It picks more recent windows first, and compacting blocks only from the same tenant.
blockSelector := newTimeWindowBlockSelector(blocklist,
rw.compactorCfg.MaxCompactionRange,
window,
rw.compactorCfg.MaxCompactionObjects,
rw.compactorCfg.MaxBlockBytes,
defaultMinInputBlocks,
Expand Down
9 changes: 7 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type mockJobSharder struct{}
func (m *mockJobSharder) Owns(string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
maxBytesPerTrace int
blockRetention time.Duration
maxBytesPerTrace int
maxCompactionWindow time.Duration
}

func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration {
Expand All @@ -58,6 +59,10 @@ func (m *mockOverrides) MaxBytesPerTraceForTenant(_ string) int {
return m.maxBytesPerTrace
}

func (m *mockOverrides) MaxCompactionRangeForTenant(_ string) time.Duration {
return m.maxCompactionWindow
}

func TestCompactionRoundtrip(t *testing.T) {
for _, enc := range encoding.AllEncodings() {
version := enc.Version()
Expand Down
1 change: 1 addition & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type CompactorSharder interface {
type CompactorOverrides interface {
BlockRetentionForTenant(tenantID string) time.Duration
MaxBytesPerTraceForTenant(tenantID string) int
MaxCompactionRangeForTenant(tenantID string) time.Duration
}

type WriteableBlock interface {
Expand Down

0 comments on commit 5e70235

Please sign in to comment.