Skip to content

Commit

Permalink
add --next-crawler to forward requestCrawl calls to (#829)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold authored Nov 20, 2024
2 parents 85b52dd + e1f818e commit 6b6afc0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
9 changes: 9 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type BGS struct {

// User cache
userCache *lru.Cache[string, *User]

// nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
nextCrawlers []*url.URL
httpClient http.Client
}

type PDSResync struct {
Expand All @@ -117,6 +121,9 @@ type BGSConfig struct {
ConcurrencyPerPDS int64
MaxQueuePerPDS int64
NumCompactionWorkers int

// NextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
NextCrawlers []*url.URL
}

func DefaultBGSConfig() *BGSConfig {
Expand Down Expand Up @@ -185,6 +192,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
compactor.Start(bgs)
bgs.compactor = compactor

bgs.httpClient.Timeout = time.Second * 5

return bgs, nil
}

Expand Down
20 changes: 20 additions & 0 deletions bgs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bgs
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -187,6 +188,25 @@ func (s *BGS) handleComAtprotoSyncRequestCrawl(ctx context.Context, body *comatp
// Maybe we could do something with this response later
_ = desc

if len(s.nextCrawlers) != 0 {
blob, err := json.Marshal(body)
if err != nil {
log.Warnw("could not forward requestCrawl, json err", "err", err)
} else {
go func(bodyBlob []byte) {
for _, rpu := range s.nextCrawlers {
pu := rpu.JoinPath("/xrpc/com.atproto.sync.requestCrawl")
response, err := s.httpClient.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob))
if err != nil {
log.Warnw("requestCrawl forward failed", "err", err)
} else if response.StatusCode != http.StatusOK {
log.Warnw("requestCrawl forward failed", "status", response.Status)
}
}
}(blob)
}
}

return s.slurper.SubscribeToPds(ctx, host, true, false)
}

Expand Down
18 changes: 18 additions & 0 deletions cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
_ "net/http/pprof"
"net/url"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -205,6 +206,11 @@ func run(args []string) error {
Usage: "specify list of shard directories for carstore storage, overrides default storage within datadir",
EnvVars: []string{"RELAY_CARSTORE_SHARD_DIRS"},
},
&cli.StringSliceFlag{
Name: "next-crawler",
Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list",
EnvVars: []string{"RELAY_NEXT_CRAWLER"},
},
}

app.Action = runBigsky
Expand Down Expand Up @@ -434,6 +440,18 @@ func runBigsky(cctx *cli.Context) error {
bgsConfig.MaxQueuePerPDS = cctx.Int64("max-queue-per-pds")
bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit")
bgsConfig.NumCompactionWorkers = cctx.Int("num-compaction-workers")
nextCrawlers := cctx.StringSlice("next-crawler")
if len(nextCrawlers) != 0 {
nextCrawlerUrls := make([]*url.URL, len(nextCrawlers))
for i, tu := range nextCrawlers {
var err error
nextCrawlerUrls[i], err = url.Parse(tu)
if err != nil {
return fmt.Errorf("failed to parse next-crawler url: %w", err)
}
}
bgsConfig.NextCrawlers = nextCrawlerUrls
}
bgs, err := libbgs.NewBGS(db, ix, repoman, evtman, cachedidr, rf, hr, bgsConfig)
if err != nil {
return err
Expand Down

0 comments on commit 6b6afc0

Please sign in to comment.