Skip to content

Commit

Permalink
simplify CrawlDispatcher.mainLoop()
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 21, 2024
1 parent d619a37 commit 24beea2
Showing 1 changed file with 10 additions and 42 deletions.
52 changes: 10 additions & 42 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package indexer
import (
"context"
"fmt"
"sync"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
"sync"

"go.opentelemetry.io/otel"
)
Expand Down Expand Up @@ -74,46 +73,15 @@ type crawlWork struct {
}

func (c *CrawlDispatcher) mainLoop() {
var nextDispatchedJob *crawlWork
var jobsAwaitingDispatch []*crawlWork

// dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
var dispatchQueue chan *crawlWork

for {
var crawlJob *crawlWork = nil
select {
case actorToCrawl := <-c.ingest:
// TODO: max buffer size
crawlJob := c.enqueueJobForActor(actorToCrawl)
if crawlJob == nil {
break
}

if nextDispatchedJob == nil {
nextDispatchedJob = crawlJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
}
case dispatchQueue <- nextDispatchedJob:
c.dequeueJob(nextDispatchedJob)

if len(jobsAwaitingDispatch) > 0 {
nextDispatchedJob = jobsAwaitingDispatch[0]
jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
} else {
nextDispatchedJob = nil
dispatchQueue = nil
}
case catchupJob := <-c.catchup:
crawlJob = c.enqueueJobForActor(actorToCrawl)
case crawlJob = <-c.catchup:
// CatchupJobs are for processing events that come in while a crawl is in progress
// They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
if nextDispatchedJob == nil {
nextDispatchedJob = catchupJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
}
case uid := <-c.complete:
c.maplk.Lock()

Expand All @@ -130,15 +98,15 @@ func (c *CrawlDispatcher) mainLoop() {
job.initScrape = false
job.catchup = job.next
job.next = nil
if nextDispatchedJob == nil {
nextDispatchedJob = job
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
}
crawlJob = job
}
c.maplk.Unlock()
}
if crawlJob != nil {
c.repoSync <- crawlJob
c.dequeueJob(crawlJob)
crawlJob = nil
}
}
}

Expand Down

0 comments on commit 24beea2

Please sign in to comment.