Skip to content

Commit

Permalink
forward requestCrawl to multiple nexts
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 20, 2024
1 parent 8635797 commit 7cf0290
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
10 changes: 5 additions & 5 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ type BGS struct {
// User cache
userCache *lru.Cache[string, *User]

// nextCrawler gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
nextCrawler *url.URL
httpClient http.Client
// nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
nextCrawlers []*url.URL
httpClient http.Client
}

type PDSResync struct {
Expand Down Expand Up @@ -122,8 +122,8 @@ type BGSConfig struct {
MaxQueuePerPDS int64
NumCompactionWorkers int

// NextCrawler gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
NextCrawler *url.URL
// NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
NextCrawlers []*url.URL
}

func DefaultBGSConfig() *BGSConfig {
Expand Down
20 changes: 11 additions & 9 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,22 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
// Maybe we could do something with this response later
_ = desc

if s.nextCrawler != nil {
pu := s.nextCrawler.JoinPath("/xrpc/com.atproto.sync.requestCrawl")
if len(s.nextCrawlers) != 0 {
blob, err := json.Marshal(body)
if err != nil {
log.Warnw("could not forward requestCrawl, json err", "err", err)
} else {
go func() {
response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(blob))
if err != nil {
log.Warnw("requestCrawl forward failed", "err", err)
} else if response.StatusCode != http.StatusOK {
log.Warnw("requestCrawl forward failed", "status", response.Status)
go func(bodyBlob []byte) {
for _, rpu := range s.nextCrawlers {
pu := rpu.JoinPath("/xrpc/com.atproto.sync.requestCrawl")
response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob))
if err != nil {
log.Warnw("requestCrawl forward failed", "err", err)
} else if response.StatusCode != http.StatusOK {
log.Warnw("requestCrawl forward failed", "status", response.Status)
}
}
}()
}(blob)
}
}

Expand Down
20 changes: 12 additions & 8 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ func run(args []string) error {
Usage: "specify list of shard directories for carstore storage, overrides default storage within datadir",
EnvVars: []string{"RELAY_CARSTORE_SHARD_DIRS"},
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "next-crawler",
Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl",
Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list",
EnvVars: []string{"RELAY_NEXT_CRAWLER"},
},
}
Expand Down Expand Up @@ -440,13 +440,17 @@ func runBigsky(cctx *cli.Context) error {
bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds")
bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit")
bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers")
nextCrawler := cctx.String("next-crawler")
if nextCrawler != "" {
xu, err := url.Parse(nextCrawler)
if err != nil {
return fmt.Errorf("failed to parse next-crawler url: %w", err)
nextCrawlers := cctx.StringSlice("next-crawler")
if len(nextCrawlers) != 0 {
nextCrawlerUrls := make([]*url.URL, len(nextCrawlers))
for i, tu := range nextCrawlers {
var err error
nextCrawlerUrls[i], err = url.Parse(tu)
if err != nil {
return fmt.Errorf("failed to parse next-crawler url: %w", err)
}
}
bgsConfig.NextCrawler = xu
bgsConfig.NextCrawlers = nextCrawlerUrls
}
bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, bgsConfig)
if err != nil {
Expand Down

0 comments on commit 7cf0290

Please sign in to comment.