diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 5aca29f9e22..b3fa5cf9eff 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -720,6 +720,30 @@ paths: default: description: Default response + "/pins/{reference}/repair": + post: + summary: Repair pinned chunks by fetching missing/invalid chunks from the network + tags: + - Pinning + parameters: + - in: path + name: reference + schema: + $ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReference" + required: true + description: Swarm reference of the root hash. + responses: + "200": + description: List of repaired chunks + content: + application/json: + schema: + $ref: "SwarmCommon.yaml#/components/schemas/PinRepairResponse" + "500": + $ref: "SwarmCommon.yaml#/components/responses/500" + default: + description: Default response + "/pins": get: summary: Get the list of pinned root hash references diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index dcd54c3853a..051199e21e6 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -671,6 +671,21 @@ components: invalid: type: integer + PinRepairResponse: + type: object + properties: + reference: + $ref: "#/components/schemas/SwarmOnlyReference" + address: + $ref: "#/components/schemas/SwarmOnlyReference" + issue: + type: string + enum: + - "missing" + - "invalid" + error: + type: string + SwarmOnlyReferencesList: type: object properties: diff --git a/pkg/api/api.go b/pkg/api/api.go index b226a5cff24..932b0d2de99 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -50,10 +50,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20" "github.com/ethersphere/bee/v2/pkg/status" "github.com/ethersphere/bee/v2/pkg/steward" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storageincentives" "github.com/ethersphere/bee/v2/pkg/storageincentives/staking" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/topology" "github.com/ethersphere/bee/v2/pkg/topology/lightnode" @@ -145,7 +145,8 @@ type Storer interface { } type PinIntegrity interface { - Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) + Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk) + Repair(ctx context.Context, logger log.Logger, pin string, store storer.NetStore, res chan storer.RepairPinResult) } type Service struct { diff --git a/pkg/api/pin.go b/pkg/api/pin.go index 6fc115fd1b1..e46fbdbade5 100644 --- a/pkg/api/pin.go +++ b/pkg/api/pin.go @@ -12,9 +12,10 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/storage" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/bee/v2/pkg/traversal" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" "github.com/gorilla/mux" "golang.org/x/sync/semaphore" ) @@ -209,6 +210,13 @@ type PinIntegrityResponse struct { Invalid int `json:"invalid"` } +type PinRepairResponse struct { + Reference swarm.Address `json:"reference"` + Address swarm.Address `json:"address"` + Issue string `json:"issue"` + Error string `json:"error"` +} + func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("get_pin_integrity").Build() @@ -222,8 +230,9 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { } out := make(chan storer.PinStat) - - go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out) + corrupted := make(chan storer.CorruptedPinChunk) + go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out, corrupted) + go syncutil.Drain(corrupted) flusher, ok := w.(http.Flusher) if !ok { @@ -251,3 +260,46 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { flusher.Flush() } } + +func (s *Service) repairPins(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("post_repair_pins").Build() + paths := struct { + Reference swarm.Address `map:"reference" validate:"required"` + }{} + if response := s.mapStructure(mux.Vars(r), &paths); response != nil { + response("invalid path params", logger, w) + return + } + + res := make(chan storer.RepairPinResult) + go s.pinIntegrity.Repair(r.Context(), logger, paths.Reference.String(), s.storer, res) + + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.Header().Set("Content-Type", "application/json; charset=utf-8") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + enc := json.NewEncoder(w) + for v := range res { + issue := "missing" + if v.Chunk.Invalid { + issue = "invalid" + } + resp := PinRepairResponse{ + Reference: v.Chunk.Ref, + Address: v.Chunk.Address, + Issue: issue, + Error: v.Error.Error(), + } + if err := enc.Encode(resp); err != nil { + break + } + flusher.Flush() + } +} diff --git a/pkg/api/pin_test.go b/pkg/api/pin_test.go index 4af6c20dd10..0a1e900b30c 100644 --- a/pkg/api/pin_test.go +++ b/pkg/api/pin_test.go @@ -15,9 +15,9 @@ import ( "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" "github.com/ethersphere/bee/v2/pkg/log" mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storage/inmemstore" - storer "github.com/ethersphere/bee/v2/pkg/storer" + "github.com/ethersphere/bee/v2/pkg/storer" mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -229,9 +229,17 @@ type mockPinIntegrity struct { Store storage.Store } -func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) { +func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk) { if pin != pinRef { p.tester.Fatal("bad pin", pin) } close(out) + close(corrupted) +} + +func (p *mockPinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore storer.NetStore, res chan storer.RepairPinResult) { + if pin != pinRef { + p.tester.Fatal("bad pin", pin) + } + close(res) } diff --git a/pkg/api/router.go b/pkg/api/router.go index 8cffd8ac32e..3abc1ce30f4 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -373,8 +373,13 @@ func (s *Service) mountAPI() { "GET": http.HandlerFunc(s.getPinnedRootHash), "POST": http.HandlerFunc(s.pinRootHash), "DELETE": http.HandlerFunc(s.unpinRootHash), - }, - ) + }) + + handle("/pins/{reference}/repair", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "POST": http.HandlerFunc(s.repairPins), + }), + )) handle("/stewardship/{address}", jsonhttp.MethodHandler{ "GET": http.HandlerFunc(s.stewardshipGetHandler), diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 4af5a2f8b71..0f4f308869e 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -13,7 +13,7 @@ import ( "runtime" "github.com/ethersphere/bee/v2/pkg/encryption" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "golang.org/x/sync/errgroup" diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index f4f21fc59ba..9040b4630ce 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -10,7 +10,7 @@ import ( "fmt" "sort" - storage "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage" "github.com/ethersphere/bee/v2/pkg/storer/internal" pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index 0220c4fb899..623b6845aab 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -10,18 +10,20 @@ import ( "os" "path" "sync" - "time" - "sync/atomic" + "time" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/log" "github.com/ethersphere/bee/v2/pkg/sharky" "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/storage" + "github.com/ethersphere/bee/v2/pkg/storage/leveldbstore" "github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore" pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" ) // Validate ensures that all retrievalIndex chunks are correctly stored in sharky. @@ -240,7 +242,9 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st defer f.Close() var ch = make(chan PinStat) - go pv.Check(ctx, logger, pin, ch) + corrupted := make(chan CorruptedPinChunk) + go pv.Check(ctx, logger, pin, ch, corrupted) + go syncutil.Drain(corrupted) for st := range ch { report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref) @@ -264,7 +268,18 @@ type PinStat struct { Total, Missing, Invalid int } -func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat) { +type CorruptedPinChunk struct { + Ref swarm.Address + Address swarm.Address + Missing, Invalid bool +} + +type RepairPinResult struct { + Chunk CorruptedPinChunk + Error error +} + +func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat, corrupted chan CorruptedPinChunk) { var stats struct { total, read, invalid atomic.Int32 } @@ -272,6 +287,7 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, n := time.Now() defer func() { close(out) + close(corrupted) logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load()) }() @@ -342,6 +358,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, } if !validChunk(item, buf[:item.Location.Length]) { invalid.Add(1) + select { + case <-ctx.Done(): + return + case corrupted <- CorruptedPinChunk{Ref: pin, Address: item.Address, Invalid: true}: + } } } }() @@ -362,6 +383,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, rIdx := &chunkstore.RetrievalIndexItem{Address: addr} if err := p.Store.Get(rIdx); err != nil { missing.Add(1) + select { + case <-ctx.Done(): + return true, nil + case corrupted <- CorruptedPinChunk{Ref: pin, Address: addr, Missing: true}: + } } else { select { case <-ctx.Done(): @@ -400,3 +426,68 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, } } } + +func (p *PinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore NetStore, res chan RepairPinResult) { + defer func() { + close(res) + }() + + out := make(chan PinStat) + corrupted := make(chan CorruptedPinChunk) + go p.Check(ctx, logger, pin, out, corrupted) + go syncutil.Drain(out) + + for v := range corrupted { + bStore := p.Store.(*leveldbstore.Store) + s := transaction.NewStorage(p.Sharky, bStore) + + var ch swarm.Chunk + var err error + if v.Missing || v.Invalid { + ch, err = netStore.Download(false).Get(ctx, v.Address) + if err != nil { + r := RepairPinResult{ + Chunk: v, + Error: fmt.Errorf("download failed: %s", err.Error()), + } + select { + case <-ctx.Done(): + return + case res <- r: + } + } + } + + if v.Missing { + logger.Info("repairing missing chunk", "address", v.Address) + err = s.Run(ctx, func(st transaction.Store) error { + err = st.ChunkStore().Put(ctx, ch) + if err != nil { + return fmt.Errorf("put missing chunk: %w", err) + } + return nil + }) + } + + if v.Invalid { + logger.Info("repairing invalid chunk (replace)", "address", v.Address) + err = s.Run(ctx, func(st transaction.Store) error { + err = st.ChunkStore().Replace(ctx, ch, false) + if err != nil { + return fmt.Errorf("replacing invalid chunk: %w", err) + } + return nil + }) + } + + r := RepairPinResult{ + Chunk: v, + Error: err, + } + select { + case <-ctx.Done(): + return + case res <- r: + } + } +} diff --git a/pkg/storer/validate_test.go b/pkg/storer/validate_test.go new file mode 100644 index 00000000000..9aea4aae852 --- /dev/null +++ b/pkg/storer/validate_test.go @@ -0,0 +1,136 @@ +// Copyright 2024 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "os" + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/postage" + batchstore "github.com/ethersphere/bee/v2/pkg/postage/batchstore/mock" + "github.com/ethersphere/bee/v2/pkg/storage" + chunktesting "github.com/ethersphere/bee/v2/pkg/storage/testing" + "github.com/ethersphere/bee/v2/pkg/storer/internal/transaction" + "github.com/ethersphere/bee/v2/pkg/swarm" + "github.com/ethersphere/bee/v2/pkg/topology" + kademlia "github.com/ethersphere/bee/v2/pkg/topology/mock" + "github.com/ethersphere/bee/v2/pkg/util/syncutil" +) + +func dbTestOps(baseAddr swarm.Address, reserveCapacity int, bs postage.Storer, radiusSetter topology.SetStorageRadiuser, reserveWakeUpTime time.Duration) *Options { + opts := DefaultOptions() + if radiusSetter == nil { + radiusSetter = kademlia.NewTopologyDriver() + } + + if bs == nil { + bs = batchstore.New() + } + + opts.Address = baseAddr + opts.RadiusSetter = radiusSetter + opts.ReserveCapacity = reserveCapacity + opts.Batchstore = bs + opts.ReserveWakeUpDuration = reserveWakeUpTime + opts.Logger = log.Noop + + return opts +} + +type testRetrieval struct { + fn func(swarm.Address) (swarm.Chunk, error) +} + +func (t *testRetrieval) RetrieveChunk(_ context.Context, address swarm.Address, _ swarm.Address) (swarm.Chunk, error) { + return t.fn(address) +} + +func TestPinIntegrity_Repair(t *testing.T) { + + opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) + db, err := New(context.Background(), os.TempDir(), opts) + if err != nil { + t.Fatal(err) + } + + chunks := chunktesting.GenerateTestRandomChunks(10) + session, err := db.NewCollection(context.TODO()) + if err != nil { + t.Fatal(err) + } + + for _, ch := range chunks { + err := session.Put(context.TODO(), ch) + if err != nil { + t.Fatal(err) + } + } + + pin := chunks[0].Address() + err = session.Done(pin) + if err != nil { + t.Fatal(err) + } + + // no repair needed + count := 0 + res := make(chan RepairPinResult) + db.pinIntegrity.Repair(context.Background(), log.Noop, pin.String(), db, res) + for range res { + count++ + } + if count != 0 { + t.Fatalf("expected 0 repairs, got %d", count) + } + + // repair needed + count = 0 + err = db.storage.Run(context.Background(), func(s transaction.Store) error { + return s.ChunkStore().Delete(context.Background(), chunks[1].Address()) + }) + if err != nil { + t.Fatal(err) + } + + res = make(chan RepairPinResult) + db.SetRetrievalService(&testRetrieval{fn: func(address swarm.Address) (swarm.Chunk, error) { + for _, ch := range chunks { + if ch.Address().Equal(address) { + return ch, nil + } + } + return nil, storage.ErrNotFound + }}) + go db.pinIntegrity.Repair(context.Background(), log.Noop, pin.String(), db, res) + for range res { + count++ + } + + if count != 1 { + t.Fatalf("expected 1 repair, got %d", count) + } + + out := make(chan PinStat) + corrupted := make(chan CorruptedPinChunk) + go db.pinIntegrity.Check(context.Background(), log.Noop, pin.String(), out, corrupted) + go syncutil.Drain(corrupted) + + v := <-out + if !v.Ref.Equal(pin) { + t.Fatalf("expected pin %s, got %s", pin, v.Ref) + } + if v.Total != 10 { + t.Fatalf("expected total 10, got %d", v.Total) + } + if v.Missing != 0 { + t.Fatalf("expected missing 0, got %d", v.Missing) + } + if v.Invalid != 0 { + t.Fatalf("expected invalid 0, got %d", v.Invalid) + } +} diff --git a/pkg/util/syncutil/syncutil.go b/pkg/util/syncutil/syncutil.go index c06e9ebdeba..b54e0beae3f 100644 --- a/pkg/util/syncutil/syncutil.go +++ b/pkg/util/syncutil/syncutil.go @@ -24,3 +24,9 @@ func WaitWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { return false } } + +// Drain drains the channel until it's closed. +func Drain[T any](c <-chan T) { + for range c { + } +}