diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index 833fbbc..5fa642d 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -83,6 +83,14 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } +// notifiersForRootCid is used for reducing lock contention by only notifying +// exchanges related to the same content root CID +type notifiersForRootCid struct { + lk sync.RWMutex + deleted int8 + notifiers []Notifier +} + type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -90,8 +98,7 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - lk sync.RWMutex - notifiers map[Notifier]struct{} + notifiers sync.Map // cid -> notifiersForRootCid metrics *GraphGatewayMetrics } @@ -152,7 +159,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, - notifiers: make(map[Notifier]struct{}), + notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -242,6 +249,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } +func (api *GraphGateway) getRootOfPath(path string) string { + pth, err := ipfspath.ParsePath(path) + if err != nil { + return path + } + if pth.IsJustAKey() { + return pth.Segments()[0] + } else { + return pth.Segments()[1] + } +} + /* Implementation iteration plan: @@ -263,9 +282,25 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con metrics: api.metrics, } - api.lk.Lock() - api.notifiers[exch] = struct{}{} - api.lk.Unlock() + notifierKey := api.getRootOfPath(path) + var notifier *notifiersForRootCid + for { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForRootCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForRootCid); ok { + n.lk.Lock() + // could have been deleted after our load. try again. + if n.deleted != 0 { + n.lk.Unlock() + continue + } + notifier = n + n.notifiers = append(n.notifiers, exch) + n.lk.Unlock() + break + } else { + return nil, nil, errors.New("failed to get notifier") + } + } go func(metrics *GraphGatewayMetrics) { defer func() { @@ -297,7 +332,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return err } metrics.carBlocksFetchedMetric.Inc() - api.notifyAllOngoingRequests(ctx, blk) + api.notifyOngoingRequests(ctx, notifierKey, blk) } }) if err != nil { @@ -317,21 +352,37 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con } return blkgw, func() { - api.lk.Lock() - delete(api.notifiers, exch) - api.lk.Unlock() + notifier.lk.Lock() + for i, e := range notifier.notifiers { + if e == exch { + notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) + break + } + } + if len(notifier.notifiers) == 0 { + notifier.deleted = 1 + api.notifiers.Delete(notifierKey) + } + notifier.lk.Unlock() }, nil } -func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...blocks.Block) { - api.lk.RLock() - for n := range api.notifiers { - err := n.NotifyNewBlocks(ctx, blks...) - if err != nil { - graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) +func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { + if notifiers, ok := api.notifiers.Load(key); ok { + notifier, ok := notifiers.(*notifiersForRootCid) + if !ok { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", "could not get notifiersForRootCid") + return + } + notifier.lk.RLock() + for _, n := range notifier.notifiers { + err := n.NotifyNewBlocks(ctx, blks...) + if err != nil { + graphLog.Errorw("notifyOngoingRequests failed", "key", key, "error", err) + } } + notifier.lk.RUnlock() } - api.lk.RUnlock() } type fileCloseWrapper struct {