Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay metrics #828

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
21 changes: 20 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
userLookupDuration.Observe(time.Since(s).Seconds())
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou").Inc()
return fmt.Errorf("looking up event user: %w", err)
}

Expand All @@ -900,6 +901,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
subj, err := bgs.createExternalUser(ctx, evt.Repo)
newUserDiscoveryDuration.Observe(time.Since(start).Seconds())
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "uerr").Inc()
return fmt.Errorf("fed event create external user: %w", err)
}

Expand All @@ -914,20 +916,24 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
return nil
}

if ustatus == events.AccountStatusSuspended {
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
return nil
}

if ustatus == events.AccountStatusDeactivated {
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
return nil
}

if evt.Rebase {
repoCommitsResultCounter.WithLabelValues(host.Host, "rebase").Inc()
return fmt.Errorf("rebase was true in event seq:%d,host:%s", evt.Seq, host.Host)
}

Expand All @@ -938,10 +944,12 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event

subj, err := bgs.createExternalUser(ctx, evt.Repo)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "uerr2").Inc()
return err
}

if subj.PDS != host.ID {
repoCommitsResultCounter.WithLabelValues(host.Host, "noauth").Inc()
return fmt.Errorf("event from non-authoritative pds")
}
}
Expand All @@ -950,16 +958,19 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
span.SetAttributes(attribute.Bool("tombstoned", true))
// we've checked the authority of the users PDS, so reinstate the account
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumn("tombstoned", false).Error; err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "tomb").Inc()
return fmt.Errorf("failed to un-tombstone a user: %w", err)
}
u.SetTombstoned(false)

ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou2").Inc()
return fmt.Errorf("failed to look up user (tombstone recover): %w", err)
}

// Now a simple re-crawl should suffice to bring the user back online
repoCommitsResultCounter.WithLabelValues(host.Host, "catchupt").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

Expand All @@ -968,6 +979,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
rebasesCounter.WithLabelValues(host.Host).Add(1)
ai, err := bgs.Index.LookupUser(ctx, u.ID)
if err != nil {
repoCommitsResultCounter.WithLabelValues(host.Host, "nou3").Inc()
return fmt.Errorf("failed to look up user (slow path): %w", err)
}

Expand All @@ -979,26 +991,33 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
// processor coming off of the pds stream, we should investigate
// whether or not we even need this 'slow path' logic, as it makes
// accounting for which events have been processed much harder
repoCommitsResultCounter.WithLabelValues(host.Host, "catchup").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
log.Warnw("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())

if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
ai, lerr := bgs.Index.LookupUser(ctx, u.ID)
if lerr != nil {
log.Warnw("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr)
}

span.SetAttributes(attribute.Bool("catchup_queue", true))

log.Infow("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
}

log.Warnw("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc()
return fmt.Errorf("handle user event failed: %w", err)
}

repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
return nil
case env.RepoHandle != nil:
log.Infow("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
Expand Down
5 changes: 5 additions & 0 deletions bgs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ var repoCommitsReceivedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of events received",
}, []string{"pds"})

var repoCommitsResultCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "repo_commits_result_counter",
Help: "The results of commit events received",
}, []string{"pds", "status"})

var rebasesCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "event_rebases",
Help: "The total number of rebase events received",
Expand Down
5 changes: 4 additions & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,10 @@ func runBigsky(cctx *cli.Context) error {

rf := indexer.NewRepoFetcher(db, repoman, cctx.Int("max-fetch-concurrency"))

ix, err := indexer.NewIndexer(db, notifman, evtman, cachedidr, rf, true, cctx.Bool("spidering"), false)
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()

ix, err := indexer.NewIndexer(ctx, db, notifman, evtman, cachedidr, rf, true, false, cctx.Bool("spidering"))
if err != nil {
return err
}
Expand Down
33 changes: 29 additions & 4 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
Expand All @@ -29,12 +30,12 @@ type CrawlDispatcher struct {
concurrency int
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
func NewCrawlDispatcher(ctx context.Context, repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

return &CrawlDispatcher{
out := &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan models.Uid),
Expand All @@ -43,7 +44,10 @@ func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurre
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
}, nil
}
go out.CatchupRepoGaugePoller(ctx)

return out, nil
}

func (c *CrawlDispatcher) Run() {
Expand Down Expand Up @@ -173,24 +177,26 @@ func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
}

func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
catchupEventsEnqueued.Inc()
c.maplk.Lock()
defer c.maplk.Unlock()

// If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
job, ok := c.todo[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("todo").Inc()
job.catchup = append(job.catchup, catchup)
return nil
}

// If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
job, ok = c.inProgress[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("prog").Inc()
job.next = append(job.next, catchup)
return nil
}

catchupEventsEnqueued.WithLabelValues("new").Inc()
// Otherwise, we need to create a new crawl job for this actor and enqueue it
cw := &crawlWork{
act: catchup.user,
Expand Down Expand Up @@ -269,3 +275,22 @@ func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bo

return false
}

func (c *CrawlDispatcher) countReposInSlowPath() int {
c.maplk.Lock()
defer c.maplk.Unlock()
return len(c.inProgress) + len(c.todo)
}

func (c *CrawlDispatcher) CatchupRepoGaugePoller(ctx context.Context) {
done := ctx.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
case <-ticker.C:
catchupReposGauge.Set(float64(c.countReposInSlowPath()))
}
}
}
4 changes: 2 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Indexer struct {
ApplyPDSClientSettings func(*xrpc.Client)
}

func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
func NewIndexer(ctx context.Context, db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
db.AutoMigrate(&models.FeedPost{})
db.AutoMigrate(&models.ActorInfo{})
db.AutoMigrate(&models.FollowRecord{})
Expand All @@ -68,7 +68,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
c, err := NewCrawlDispatcher(ctx, fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ var reposFetched = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of repos fetched",
}, []string{"status"})

var catchupEventsEnqueued = promauto.NewCounter(prometheus.CounterOpts{
var catchupEventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_enqueued",
Help: "Number of catchup events enqueued",
})
}, []string{"how"})

var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_catchup_events_processed",
Help: "Number of catchup events processed",
})

var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_failed",
Help: "Number of catchup events processed",
}, []string{"err"})

var catchupReposGauge = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_repos",
Help: "Number of repos waiting on catchup",
})
2 changes: 1 addition & 1 deletion indexer/posts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testIndexer(t *testing.T) *testIx {

rf := NewRepoFetcher(maindb, repoman, 10)

ix, err := NewIndexer(maindb, notifman, evtman, didr, rf, false, true, true)
ix, err := NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, false, true, true)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions indexer/repofetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) er

var pds models.PDS
if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
catchupEventsFailed.WithLabelValues("nopds").Inc()
return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
}

rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid)
if err != nil && !isNotFound(err) {
catchupEventsFailed.WithLabelValues("noroot").Inc()
return fmt.Errorf("failed to get repo root: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewServer(db *gorm.DB, cs carstore.CarStore, serkey *did.PrivKey, handleSuf

rf := indexer.NewRepoFetcher(db, repoman, 10)

ix, err := indexer.NewIndexer(db, notifman, evtman, didr, rf, false, true, true)
ix, err := indexer.NewIndexer(context.Background(), db, notifman, evtman, didr, rf, false, true, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func SetupRelay(ctx context.Context, didr plc.PLCClient) (*TestRelay, error) {
evtman := events.NewEventManager(diskpersist)
rf := indexer.NewRepoFetcher(maindb, repoman, 10)

ix, err := indexer.NewIndexer(maindb, notifman, evtman, didr, rf, true, true, true)
ix, err := indexer.NewIndexer(context.Background(), maindb, notifman, evtman, didr, rf, true, true, true)
if err != nil {
return nil, err
}
Expand Down
Loading