Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

fix: break graph api notifiers across a sync map #102

Merged
merged 4 commits into from
Apr 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions lib/graph_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,22 @@ type Notifier interface {
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error
}

// notifiersForRootCid is used for reducing lock contention by only notifying
// exchanges related to the same content root CID
type notifiersForRootCid struct {
lk sync.RWMutex
deleted int8
notifiers []Notifier
}

type GraphGateway struct {
fetcher CarFetcher
blockFetcher exchange.Fetcher
routing routing.ValueStore
namesys namesys.NameSystem
bstore blockstore.Blockstore

lk sync.RWMutex
notifiers map[Notifier]struct{}
notifiers sync.Map // cid -> notifiersForRootCid
metrics *GraphGatewayMetrics
}

Expand Down Expand Up @@ -152,7 +159,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ..
routing: vs,
namesys: ns,
bstore: bs,
notifiers: make(map[Notifier]struct{}),
notifiers: sync.Map{},
metrics: registerGraphGatewayMetrics(),
}, nil
}
Expand Down Expand Up @@ -242,6 +249,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics {
}
}

func (api *GraphGateway) getRootOfPath(path string) string {
pth, err := ipfspath.ParsePath(path)
if err != nil {
return path
}
if pth.IsJustAKey() {
return pth.Segments()[0]
} else {
return pth.Segments()[1]
}
}

/*
Implementation iteration plan:

Expand All @@ -263,9 +282,25 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
metrics: api.metrics,
}

api.lk.Lock()
api.notifiers[exch] = struct{}{}
api.lk.Unlock()
notifierKey := api.getRootOfPath(path)
var notifier *notifiersForRootCid
for {
notifiers, _ := api.notifiers.LoadOrStore(notifierKey, &notifiersForRootCid{notifiers: []Notifier{}})
if n, ok := notifiers.(*notifiersForRootCid); ok {
n.lk.Lock()
// could have been deleted after our load. try again.
if n.deleted != 0 {
n.lk.Unlock()
continue
}
notifier = n
n.notifiers = append(n.notifiers, exch)
n.lk.Unlock()
break
} else {
return nil, nil, errors.New("failed to get notifier")
}
}

go func(metrics *GraphGatewayMetrics) {
defer func() {
Expand Down Expand Up @@ -297,7 +332,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
return err
}
metrics.carBlocksFetchedMetric.Inc()
api.notifyAllOngoingRequests(ctx, blk)
api.notifyOngoingRequests(ctx, notifierKey, blk)
}
})
if err != nil {
Expand All @@ -317,21 +352,37 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
}

return blkgw, func() {
api.lk.Lock()
delete(api.notifiers, exch)
api.lk.Unlock()
notifier.lk.Lock()
for i, e := range notifier.notifiers {
if e == exch {
notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...)
break
}
}
if len(notifier.notifiers) == 0 {
notifier.deleted = 1
api.notifiers.Delete(notifierKey)
}
notifier.lk.Unlock()
}, nil
}

func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...blocks.Block) {
api.lk.RLock()
for n := range api.notifiers {
err := n.NotifyNewBlocks(ctx, blks...)
if err != nil {
graphLog.Errorw("notifyAllOngoingRequests failed", "error", err)
func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) {
if notifiers, ok := api.notifiers.Load(key); ok {
notifier, ok := notifiers.(*notifiersForRootCid)
if !ok {
graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid")
return
}
notifier.lk.RLock()
for _, n := range notifier.notifiers {
err := n.NotifyNewBlocks(ctx, blks...)
if err != nil {
graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err)
}
}
notifier.lk.RUnlock()
}
api.lk.RUnlock()
}

type fileCloseWrapper struct {
Expand Down