From 60cf0dd24f5ec85c3576984e53220c8a3a29a1fc Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 22 Nov 2024 13:08:02 -0500 Subject: [PATCH] pr feedback: .Shudown() instead of Context --- cmd/bigsky/main.go | 6 ++---- indexer/crawler.go | 16 +++++++++++----- indexer/indexer.go | 8 ++++++-- indexer/posts_test.go | 3 ++- pds/server.go | 2 +- testing/utils.go | 2 +- 6 files changed, 23 insertions(+), 14 deletions(-) diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 5bbff85e1..f82ee2bd0 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -382,13 +382,11 @@ func runBigsky(cctx *cli.Context) error { rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency")) - ctx, ctxCancel := context.WithCancel(context.Background()) - defer ctxCancel() - - ix, err := indexer.NewIndexer(ctx, db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) + ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering")) if err != nil { return err } + defer ix.Shutdown() rlskip := cctx.String("bsky-social-rate-limit-skip") ix.ApplyPDSClientSettings = func(c *xrpc.Client) { diff --git a/indexer/crawler.go b/indexer/crawler.go index 87944fbc4..e5e41b9bb 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -28,9 +28,11 @@ type CrawlDispatcher struct { doRepoCrawl func(context.Context, *crawlWork) error concurrency int + + done chan struct{} } -func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { +func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) { if concurrency < 1 { return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") } @@ -44,8 +46,9 @@ func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawl concurrency: concurrency, todo: make(map[models.Uid]*crawlWork), inProgress: make(map[models.Uid]*crawlWork), + done: make(chan struct{}), } - go out.CatchupRepoGaugePoller(ctx) + go out.CatchupRepoGaugePoller() return out, nil } @@ -58,6 +61,10 @@ func (c *CrawlDispatcher) Run() { } } +func (c *CrawlDispatcher) Shutdown() { + close(c.done) +} + type catchupJob struct { evt *comatproto.SyncSubscribeRepos_Commit host *models.PDS @@ -282,13 +289,12 @@ func (c *CrawlDispatcher) countReposInSlowPath() int { return len(c.inProgress) + len(c.todo) } -func (c *CrawlDispatcher) CatchupRepoGaugePoller(ctx context.Context) { - done := ctx.Done() +func (c *CrawlDispatcher) CatchupRepoGaugePoller() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { - case <-done: + case <-c.done: case <-ticker.C: catchupReposGauge.Set(float64(c.countReposInSlowPath())) } diff --git a/indexer/indexer.go b/indexer/indexer.go index b2ddb72f3..7442f9f47 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -47,7 +47,7 @@ type Indexer struct { ApplyPDSClientSettings func(*xrpc.Client) } -func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { +func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) { db.AutoMigrate(&models.FeedPost{}) db.AutoMigrate(&models.ActorInfo{}) db.AutoMigrate(&models.FollowRecord{}) @@ -68,7 +68,7 @@ func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationMa } if crawl { - c, err := NewCrawlDispatcher(ctx, fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) + c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency) if err != nil { return nil, err } @@ -80,6 +80,10 @@ func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationMa return ix, nil } +func (ix *Indexer) Shutdown() { + ix.Crawler.Shutdown() +} + func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error { ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent") defer span.End() diff --git a/indexer/posts_test.go b/indexer/posts_test.go index c6b9d72a7..9b9fddb4a 100644 --- a/indexer/posts_test.go +++ b/indexer/posts_test.go @@ -63,7 +63,7 @@ func testIndexer(t *testing.T) *testIx { rf := NewRepoFetcher(maindb, repoman, 10) - ix, err := NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, false, true, true) + ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true) if err != nil { t.Fatal(err) } @@ -81,6 +81,7 @@ func (ix *testIx) Cleanup() { if ix.dir != "" { _ = os.RemoveAll(ix.dir) } + ix.ix.Shutdown() } // TODO: dedupe this out into some testing utility package diff --git a/pds/server.go b/pds/server.go index 82e42c606..54f1dfed1 100644 --- a/pds/server.go +++ b/pds/server.go @@ -79,7 +79,7 @@ func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuf rf := indexer.NewRepoFetcher(db, repoman, 10) - ix, err := indexer.NewIndexer(context.Background(), db, notifman, evtman, didr, rf, false, true, true) + ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true) if err != nil { return nil, err } diff --git a/testing/utils.go b/testing/utils.go index 54b97e019..7af6e1adc 100644 --- a/testing/utils.go +++ b/testing/utils.go @@ -569,7 +569,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) { evtman := events.NewEventManager(diskpersist) rf := indexer.NewRepoFetcher(maindb, repoman, 10) - ix, err := indexer.NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, true, true, true) + ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true) if err != nil { return nil, err }