Skip to content

Commit

Permalink
pr feedback: .Shudown() instead of Context
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 22, 2024
1 parent e879da6 commit 60cf0dd
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 14 deletions.
6 changes: 2 additions & 4 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,11 @@ func runBigsky(cctx *cli.Context) error {

rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency"))

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

ix, err := indexer.NewIndexer(ctx, db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering"))
ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering"))
if err != nil {
return err
}
defer ix.Shutdown()

rlskip := cctx.String("bsky-social-rate-limit-skip")
ix.ApplyPDSClientSettings = func(c *xrpc.Client) {
Expand Down
16 changes: 11 additions & 5 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ type CrawlDispatcher struct {
doRepoCrawl func(context.Context, *crawlWork) error

concurrency int

done chan struct{}
}

func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}
Expand All @@ -44,8 +46,9 @@ func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawl
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
done: make(chan struct{}),
}
go out.CatchupRepoGaugePoller(ctx)
go out.CatchupRepoGaugePoller()

return out, nil
}
Expand All @@ -58,6 +61,10 @@ func (c *CrawlDispatcher) Run() {
}
}

func (c *CrawlDispatcher) Shutdown() {
close(c.done)
}

type catchupJob struct {
evt *comatproto.SyncSubscribeRepos_Commit
host *models.PDS
Expand Down Expand Up @@ -282,13 +289,12 @@ func (c *CrawlDispatcher) countReposInSlowPath() int {
return len(c.inProgress) + len(c.todo)
}

func (c *CrawlDispatcher) CatchupRepoGaugePoller(ctx context.Context) {
done := ctx.Done()
func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
case <-c.done:
case <-ticker.C:
catchupReposGauge.Set(float64(c.countReposInSlowPath()))
}
Expand Down
8 changes: 6 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Indexer struct {
ApplyPDSClientSettings func(*xrpc.Client)
}

func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
db.AutoMigrate(&models.FeedPost{})
db.AutoMigrate(&models.ActorInfo{})
db.AutoMigrate(&models.FollowRecord{})
Expand All @@ -68,7 +68,7 @@ func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationMa
}

if crawl {
c, err := NewCrawlDispatcher(ctx, fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
if err != nil {
return nil, err
}
Expand All @@ -80,6 +80,10 @@ func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationMa
return ix, nil
}

func (ix *Indexer) Shutdown() {
ix.Crawler.Shutdown()
}

func (ix *Indexer) HandleRepoEvent(ctx context.Context, evt *repomgr.RepoEvent) error {
ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
defer span.End()
Expand Down
3 changes: 2 additions & 1 deletion indexer/posts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testIndexer(t *testing.T) *testIx {

rf := NewRepoFetcher(maindb, repoman, 10)

ix, err := NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, false, true, true)
ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -81,6 +81,7 @@ func (ix *testIx) Cleanup() {
if ix.dir != "" {
_ = os.RemoveAll(ix.dir)
}
ix.ix.Shutdown()
}

// TODO: dedupe this out into some testing utility package
Expand Down
2 changes: 1 addition & 1 deletion pds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuf

rf := indexer.NewRepoFetcher(db, repoman, 10)

ix, err := indexer.NewIndexer(context.Background(), db, notifman, evtman, didr, rf, false, true, true)
ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
evtman := events.NewEventManager(diskpersist)
rf := indexer.NewRepoFetcher(maindb, repoman, 10)

ix, err := indexer.NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, true, true, true)
ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 60cf0dd

Please sign in to comment.