From e56df8ac044784cdd18e0a6a1971d15bfc5123dc Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 6 Jul 2023 14:21:22 -0400 Subject: [PATCH] chore: remove unused code --- lib/block_fetchers.go | 194 ------------------------------------------ lib/graph_gateway.go | 63 -------------- 2 files changed, 257 deletions(-) delete mode 100644 lib/block_fetchers.go diff --git a/lib/block_fetchers.go b/lib/block_fetchers.go deleted file mode 100644 index 72ec622..0000000 --- a/lib/block_fetchers.go +++ /dev/null @@ -1,194 +0,0 @@ -package lib - -import ( - "context" - "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/exchange" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" - "github.com/multiformats/go-multihash" - "go.uber.org/multierr" -) - -type inboundBlockExchange struct { - ps BlockPubSub -} - -func newInboundBlockExchange() *inboundBlockExchange { - return &inboundBlockExchange{ - ps: NewBlockPubSub(), - } -} - -func (i *inboundBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blk, more := <-i.ps.Subscribe(ctx, c.Hash()) - if err := ctx.Err(); err != nil { - return nil, err - } - if !more { - return nil, format.ErrNotFound{Cid: c} - } - return blk, nil -} - -func (i *inboundBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - mhMap := make(map[string]struct{}) - for _, c := range cids { - mhMap[string(c.Hash())] = struct{}{} - } - mhs := make([]multihash.Multihash, 0, len(mhMap)) - for k := range mhMap { - mhs = append(mhs, multihash.Multihash(k)) - } - return i.ps.Subscribe(ctx, mhs...), nil -} - -func (i *inboundBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - // TODO: handle context cancellation and/or blockage here - i.ps.Publish(blocks...) - return nil -} - -func (i *inboundBlockExchange) Close() error { - i.ps.Shutdown() - return nil -} - -var _ exchange.Interface = (*inboundBlockExchange)(nil) - -type handoffExchange struct { - startingExchange, followupExchange exchange.Interface - bstore blockstore.Blockstore - handoffCh <-chan struct{} - metrics *GraphGatewayMetrics -} - -func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - blkCh, err := f.startingExchange.GetBlocks(ctx, []cid.Cid{c}) - if err != nil { - return nil, err - } - blk, ok := <-blkCh - if ok { - return blk, nil - } - - select { - case <-f.handoffCh: - graphLog.Debugw("switching to backup block fetcher", "cid", c) - f.metrics.blockRecoveryAttemptMetric.Inc() - return f.followupExchange.GetBlock(ctx, c) - case <-ctx.Done(): - return nil, ctx.Err() - } -} - -func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - blkCh, err := f.startingExchange.GetBlocks(ctx, cids) - if err != nil { - return nil, err - } - - retCh := make(chan blocks.Block) - - go func() { - cs := cid.NewSet() - for cs.Len() < len(cids) { - blk, ok := <-blkCh - if !ok { - break - } - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - } - } - - for cs.Len() < len(cids) { - select { - case <-ctx.Done(): - return - case <-f.handoffCh: - var newCidArr []cid.Cid - for _, c := range cids { - if !cs.Has(c) { - blk, _ := f.bstore.Get(ctx, c) - if blk != nil { - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - return - } - } else { - newCidArr = append(newCidArr, c) - } - } - } - - if len(newCidArr) == 0 { - return - } - - graphLog.Debugw("needed to use use a backup fetcher for cids", "cids", newCidArr) - f.metrics.blockRecoveryAttemptMetric.Add(float64(len(newCidArr))) - fch, err := f.followupExchange.GetBlocks(ctx, newCidArr) - if err != nil { - graphLog.Errorw("error getting blocks from followupExchange", "error", err) - return - } - for cs.Len() < len(cids) { - blk, ok := <-fch - if !ok { - return - } - select { - case retCh <- blk: - cs.Add(blk.Cid()) - case <-ctx.Done(): - return - } - } - } - } - }() - return retCh, nil -} - -func (f *handoffExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - err1 := f.startingExchange.NotifyNewBlocks(ctx, blocks...) - err2 := f.followupExchange.NotifyNewBlocks(ctx, blocks...) - return multierr.Combine(err1, err2) -} - -func (f *handoffExchange) Close() error { - err1 := f.startingExchange.Close() - err2 := f.followupExchange.Close() - return multierr.Combine(err1, err2) -} - -var _ exchange.Interface = (*handoffExchange)(nil) - -type blockFetcherExchWrapper struct { - f exchange.Fetcher -} - -func (b *blockFetcherExchWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - return b.f.GetBlock(ctx, c) -} - -func (b *blockFetcherExchWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { - return b.f.GetBlocks(ctx, cids) -} - -func (b *blockFetcherExchWrapper) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { - return nil -} - -func (b *blockFetcherExchWrapper) Close() error { - return nil -} - -var _ exchange.Interface = (*blockFetcherExchWrapper)(nil) diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index dde27fe..8ba2d71 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -94,18 +94,6 @@ func WithBlockstore(bs blockstore.Blockstore) GraphGatewayOption { type GraphGatewayOption func(gwOptions *gwOptions) error -type Notifier interface { - NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error -} - -// notifiersForRootCid is used for reducing lock contention by only notifying -// exchanges related to the same content root CID -type notifiersForRootCid struct { - lk sync.RWMutex - deleted int8 - notifiers []Notifier -} - type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -272,57 +260,6 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } -func (api *GraphGateway) getRootOfPath(path string) string { - pth, err := ipfspath.ParsePath(path) - if err != nil { - return path - } - if pth.IsJustAKey() { - return pth.Segments()[0] - } else { - return pth.Segments()[1] - } -} - -type fileCloseWrapper struct { - files.File - closeFn func() -} - -func (w *fileCloseWrapper) Close() error { - w.closeFn() - return w.File.Close() -} - -type dirCloseWrapper struct { - files.Directory - closeFn func() -} - -func (w *dirCloseWrapper) Close() error { - w.closeFn() - return w.Directory.Close() -} - -func wrapNodeWithClose[T files.Node](node T, closeFn func()) (T, error) { - var genericNode files.Node = node - switch n := genericNode.(type) { - case *files.Symlink: - closeFn() - return node, nil - case files.File: - var f files.File = &fileCloseWrapper{n, closeFn} - return f.(T), nil - case files.Directory: - var d files.Directory = &dirCloseWrapper{n, closeFn} - return d.(T), nil - default: - closeFn() - var zeroType T - return zeroType, fmt.Errorf("unsupported node type") - } -} - func (api *GraphGateway) fetchCAR(ctx context.Context, path gateway.ImmutablePath, params gateway.CarParams, cb DataCallback) error { escapedPath := url.PathEscape(path.String()[1:]) paramsBuilder := strings.Builder{}