Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: repair pin #4836

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,30 @@ paths:
default:
description: Default response

"/pins/{reference}/repair":
post:
summary: Repair pinned chunks by fetching missing/invalid chunks from the network
tags:
- Pinning
parameters:
- in: path
name: reference
schema:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmOnlyReference"
required: true
description: Swarm reference of the root hash.
responses:
"200":
description: List of repaired chunks
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PinRepairResponse"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
description: Default response

"/pins":
get:
summary: Get the list of pinned root hash references
Expand Down
15 changes: 15 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,21 @@ components:
invalid:
type: integer

PinRepairResponse:
type: object
properties:
reference:
$ref: "#/components/schemas/SwarmOnlyReference"
address:
$ref: "#/components/schemas/SwarmOnlyReference"
issue:
type: string
enum:
- "missing"
- "invalid"
error:
type: string

SwarmOnlyReferencesList:
type: object
properties:
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ import (
"github.com/ethersphere/bee/v2/pkg/settlement/swap/erc20"
"github.com/ethersphere/bee/v2/pkg/status"
"github.com/ethersphere/bee/v2/pkg/steward"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storageincentives"
"github.com/ethersphere/bee/v2/pkg/storageincentives/staking"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/topology"
"github.com/ethersphere/bee/v2/pkg/topology/lightnode"
Expand Down Expand Up @@ -145,7 +145,8 @@ type Storer interface {
}

type PinIntegrity interface {
Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat)
Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk)
Repair(ctx context.Context, logger log.Logger, pin string, store storer.NetStore, res chan storer.RepairPinResult)
}

type Service struct {
Expand Down
58 changes: 55 additions & 3 deletions pkg/api/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/traversal"
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
"github.com/gorilla/mux"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -209,6 +210,13 @@ type PinIntegrityResponse struct {
Invalid int `json:"invalid"`
}

type PinRepairResponse struct {
Reference swarm.Address `json:"reference"`
Address swarm.Address `json:"address"`
Issue string `json:"issue"`
Error string `json:"error"`
}

func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("get_pin_integrity").Build()

Expand All @@ -222,8 +230,9 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
}

out := make(chan storer.PinStat)

go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out)
corrupted := make(chan storer.CorruptedPinChunk)
go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out, corrupted)
go syncutil.Drain(corrupted)

flusher, ok := w.(http.Flusher)
if !ok {
Expand Down Expand Up @@ -251,3 +260,46 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
flusher.Flush()
}
}

func (s *Service) repairPins(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("post_repair_pins").Build()
paths := struct {
Reference swarm.Address `map:"reference" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

res := make(chan storer.RepairPinResult)
go s.pinIntegrity.Repair(r.Context(), logger, paths.Reference.String(), s.storer, res)

flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)
for v := range res {
issue := "missing"
if v.Chunk.Invalid {
issue = "invalid"
}
resp := PinRepairResponse{
Reference: v.Chunk.Ref,
Address: v.Chunk.Address,
Issue: issue,
Error: v.Error.Error(),
}
if err := enc.Encode(resp); err != nil {
break
}
flusher.Flush()
}
}
14 changes: 11 additions & 3 deletions pkg/api/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemstore"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
)
Expand Down Expand Up @@ -229,9 +229,17 @@ type mockPinIntegrity struct {
Store storage.Store
}

func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat) {
func (p *mockPinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan storer.PinStat, corrupted chan storer.CorruptedPinChunk) {
if pin != pinRef {
p.tester.Fatal("bad pin", pin)
}
close(out)
close(corrupted)
}

func (p *mockPinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore storer.NetStore, res chan storer.RepairPinResult) {
if pin != pinRef {
p.tester.Fatal("bad pin", pin)
}
close(res)
}
9 changes: 7 additions & 2 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,13 @@ func (s *Service) mountAPI() {
"GET": http.HandlerFunc(s.getPinnedRootHash),
"POST": http.HandlerFunc(s.pinRootHash),
"DELETE": http.HandlerFunc(s.unpinRootHash),
},
)
})

handle("/pins/{reference}/repair", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"POST": http.HandlerFunc(s.repairPins),
}),
))

handle("/stewardship/{address}", jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.stewardshipGetHandler),
Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/internal/pinning/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"runtime"

"github.com/ethersphere/bee/v2/pkg/encryption"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
"golang.org/x/sync/errgroup"

Expand Down
2 changes: 1 addition & 1 deletion pkg/storer/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"fmt"
"sort"

storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storer/internal"
pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
Expand Down
99 changes: 95 additions & 4 deletions pkg/storer/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
"os"
"path"
"sync"
"time"

"sync/atomic"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/sharky"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/leveldbstore"
"github.com/ethersphere/bee/v2/pkg/storer/internal/chunkstore"
pinstore "github.com/ethersphere/bee/v2/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
)

// Validate ensures that all retrievalIndex chunks are correctly stored in sharky.
Expand Down Expand Up @@ -240,7 +242,9 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st
defer f.Close()

var ch = make(chan PinStat)
go pv.Check(ctx, logger, pin, ch)
corrupted := make(chan CorruptedPinChunk)
go pv.Check(ctx, logger, pin, ch, corrupted)
go syncutil.Drain(corrupted)

for st := range ch {
report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref)
Expand All @@ -264,14 +268,26 @@ type PinStat struct {
Total, Missing, Invalid int
}

func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat) {
type CorruptedPinChunk struct {
Ref swarm.Address
Address swarm.Address
Missing, Invalid bool
}

type RepairPinResult struct {
Chunk CorruptedPinChunk
Error error
}

func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat, corrupted chan CorruptedPinChunk) {
var stats struct {
total, read, invalid atomic.Int32
}

n := time.Now()
defer func() {
close(out)
close(corrupted)
logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load())
}()

Expand Down Expand Up @@ -342,6 +358,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string,
}
if !validChunk(item, buf[:item.Location.Length]) {
invalid.Add(1)
select {
case <-ctx.Done():
return
case corrupted <- CorruptedPinChunk{Ref: pin, Address: item.Address, Invalid: true}:
}
}
}
}()
Expand All @@ -362,6 +383,11 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string,
rIdx := &chunkstore.RetrievalIndexItem{Address: addr}
if err := p.Store.Get(rIdx); err != nil {
missing.Add(1)
select {
case <-ctx.Done():
return true, nil
case corrupted <- CorruptedPinChunk{Ref: pin, Address: addr, Missing: true}:
}
} else {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -400,3 +426,68 @@ func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string,
}
}
}

func (p *PinIntegrity) Repair(ctx context.Context, logger log.Logger, pin string, netStore NetStore, res chan RepairPinResult) {
defer func() {
close(res)
}()

out := make(chan PinStat)
corrupted := make(chan CorruptedPinChunk)
go p.Check(ctx, logger, pin, out, corrupted)
go syncutil.Drain(out)

for v := range corrupted {
bStore := p.Store.(*leveldbstore.Store)
s := transaction.NewStorage(p.Sharky, bStore)

var ch swarm.Chunk
var err error
if v.Missing || v.Invalid {
ch, err = netStore.Download(false).Get(ctx, v.Address)
if err != nil {
r := RepairPinResult{
Chunk: v,
Error: fmt.Errorf("download failed: %s", err.Error()),
}
select {
case <-ctx.Done():
return
case res <- r:
}
}
}

if v.Missing {
logger.Info("repairing missing chunk", "address", v.Address)
err = s.Run(ctx, func(st transaction.Store) error {
err = st.ChunkStore().Put(ctx, ch)
if err != nil {
return fmt.Errorf("put missing chunk: %w", err)
}
return nil
})
}

if v.Invalid {
logger.Info("repairing invalid chunk (replace)", "address", v.Address)
err = s.Run(ctx, func(st transaction.Store) error {
err = st.ChunkStore().Replace(ctx, ch, false)
if err != nil {
return fmt.Errorf("replacing invalid chunk: %w", err)
}
return nil
})
}

r := RepairPinResult{
Chunk: v,
Error: err,
}
select {
case <-ctx.Done():
return
case res <- r:
}
}
}
Loading
Loading