From 8b7dba258df12d35a10a0fb8b79851e71120d1ba Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 13 Nov 2024 10:59:55 -0800 Subject: [PATCH] make compaction worker count configurable --- bgs/bgs.go | 26 +++++++++++++++----------- cmd/bigsky/main.go | 6 ++++++ 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 6ac5be78a..35dfab9d9 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -107,20 +107,22 @@ type SocketConsumer struct { } type BGSConfig struct { - SSL bool - CompactInterval time.Duration - DefaultRepoLimit int64 - ConcurrencyPerPDS int64 - MaxQueuePerPDS int64 + SSL bool + CompactInterval time.Duration + DefaultRepoLimit int64 + ConcurrencyPerPDS int64 + MaxQueuePerPDS int64 + NumCompactionWorkers int } func DefaultBGSConfig() *BGSConfig { return &BGSConfig{ - SSL: true, - CompactInterval: 4 * time.Hour, - DefaultRepoLimit: 100, - ConcurrencyPerPDS: 100, - MaxQueuePerPDS: 1_000, + SSL: true, + CompactInterval: 4 * time.Hour, + DefaultRepoLimit: 100, + ConcurrencyPerPDS: 100, + MaxQueuePerPDS: 1_000, + NumCompactionWorkers: 2, } } @@ -168,7 +170,9 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm return nil, err } - compactor := NewCompactor(nil) + cOpts := DefaultCompactorOptions() + cOpts.NumWorkers = config.NumCompactionWorkers + compactor := NewCompactor(cOpts) compactor.requeueInterval = config.CompactInterval compactor.Start(bgs) bgs.compactor = compactor diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 3009db3db..540796f51 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -195,6 +195,11 @@ func run(args []string) error { EnvVars: []string{"RELAY_EVENT_PLAYBACK_TTL"}, Value: 72 * time.Hour, }, + &cli.IntFlag{ + Name: "num-compaction-workers", + EnvVars: []string{"RELAY_NUM_COMPACTION_WORKERS"}, + Value: 2, + }, } app.Action = runBigsky @@ -413,6 +418,7 @@ func runBigsky(cctx *cli.Context) error { bgsConfig.ConcurrencyPerPDS = cctx.Int64("concurrency-per-pds") bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds") bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") + bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers") bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, bgsConfig) if err != nil { return err