Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: soc replica #4802

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (

"github.com/ethersphere/bee/v2/pkg/accesscontrol"
"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
Expand Down Expand Up @@ -45,11 +47,12 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
}

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
RLevel redundancy.Level `map:"Swarm-Redundancy-Level"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
Expand Down Expand Up @@ -208,7 +211,8 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
}
}

err = putter.Put(r.Context(), sch)
replicaPutter := replicas.NewSocPutter(putter)
err = replicaPutter.Put(r.Context(), sch)
if err != nil {
logger.Debug("write chunk failed", "chunk_address", sch.Address(), "error", err)
logger.Error(nil, "write chunk failed")
Expand Down
6 changes: 5 additions & 1 deletion pkg/feeds/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"fmt"
"time"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -46,7 +48,9 @@ func (f *Getter) Get(ctx context.Context, i Index) (swarm.Chunk, error) {
if err != nil {
return nil, err
}
return f.getter.Get(ctx, addr)
rLevel := redundancy.GetLevelFromContext(ctx)
getter := replicas.NewSocGetter(f.getter, rLevel)
return getter.Get(ctx, addr)
}

// FromChunk parses out the timestamp and the payload
Expand Down
4 changes: 3 additions & 1 deletion pkg/feeds/putter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/replicas"
"github.com/ethersphere/bee/v2/pkg/soc"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -53,7 +54,8 @@ func (u *Putter) Put(ctx context.Context, i Index, at int64, payload []byte) err
if err != nil {
return err
}
return u.putter.Put(ctx, ch)
putter := replicas.NewSocPutter(u.putter)
return putter.Put(ctx, ch)
}

func toChunk(at uint64, payload []byte) (swarm.Chunk, error) {
Expand Down
123 changes: 123 additions & 0 deletions pkg/replicas/soc/soc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2024 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package replicas implements a scheme to replicate SOC chunks
// in such a way that
// - the replicas are optimally dispersed to aid cross-neighbourhood redundancy
// - the replicas addresses can be deduced by retrievers only knowing the address
// of the original content addressed chunk
// - no new chunk validation rules are introduced
package soc

import (
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// replicator running the find for replicas
type socReplicator struct {
addr []byte // chunk address
queue [16]*socReplica // to sort addresses according to di
exist [30]bool // maps the 16 distinct nibbles on all levels
sizes [5]int // number of distinct neighnourhoods redcorded for each depth
C chan *socReplica
rLevel redundancy.Level
}

// NewSocReplicator replicator constructor
func NewSocReplicator(addr swarm.Address, rLevel redundancy.Level) *socReplicator {
rr := &socReplicator{
addr: addr.Bytes(),
sizes: redundancy.GetReplicaCounts(),
C: make(chan *socReplica, 16),
rLevel: rLevel,
}
go rr.replicas()
return rr
}

func AllAddresses(addr swarm.Address) []swarm.Address {
r := redundancy.PARANOID
rr := NewSocReplicator(addr, r)
addresses := make([]swarm.Address, r.GetReplicaCount())
n := 0
for r := range rr.C {
addresses[n] = swarm.NewAddress(r.Addr)
n++
}
return addresses
}

// socReplica of the mined SOC chunk (address) that serve as replicas
type socReplica struct {
Addr []byte // byte slice of the generated SOC address
nonce uint8 // used nonce to generate the address
}

// replicate returns a replica for SOC seeded with a byte of entropy as argument
func (rr *socReplicator) replicate(i uint8) (sp *socReplica) {
// calculate SOC address for potential replica
h := swarm.NewHasher()
_, _ = h.Write(rr.addr)
_, _ = h.Write([]byte{i})
return &socReplica{h.Sum(nil), i}
}

// replicas enumerates replica parameters (SOC ID) pushing it in a channel given as argument
// the order of replicas is so that addresses are always maximally dispersed
// in successive sets of addresses.
// I.e., the binary tree representing the new addresses prefix bits up to depth is balanced
func (rr *socReplicator) replicas() {
defer close(rr.C)
n := 0
for i := uint8(0); n < rr.rLevel.GetReplicaCount() && i < 255; i++ {
// create soc replica (ID and address using constant owner)
// the soc is added to neighbourhoods of depths in the closed interval [from...to]
r := rr.replicate(i)
d, m := rr.add(r, rr.rLevel)
if d == 0 {
continue
}
for m, r = range rr.queue[n:] {
if r == nil {
break
}
rr.C <- r
}
n += m
}
}

// add inserts the soc replica into a replicator so that addresses are balanced
func (rr *socReplicator) add(r *socReplica, rLevel redundancy.Level) (depth int, rank int) {
if rLevel == redundancy.NONE {
return 0, 0
}
nh := nh(rLevel, r.Addr)
if rr.exist[nh] {
return 0, 0
}
rr.exist[nh] = true
l, o := rr.add(r, rLevel.Decrement())
d := uint8(rLevel) - 1
if l == 0 {
o = rr.sizes[d]
rr.sizes[d]++
rr.queue[o] = r
l = rLevel.GetReplicaCount()
}
return l, o
}

// UTILS

// index bases needed to keep track how many addresses were mined for a level.
var replicaIndexBases = [5]int{0, 2, 6, 14}

// nh returns the lookup key based on the redundancy level
// to be used as index to the replicators exist array
func nh(rLevel redundancy.Level, addr []byte) int {
d := uint8(rLevel)
return replicaIndexBases[d-1] + int(addr[0]>>(8-d))
}
149 changes: 149 additions & 0 deletions pkg/replicas/soc_getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// the code below implements the integration of dispersed replicas in chunk fetching.
// using storage.Getter interface.
package replicas

import (
"context"
"errors"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/file/redundancy"
replicas_soc "github.com/ethersphere/bee/v2/pkg/replicas/soc"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
)

// getter is the private implementation of storage.Getter, an interface for
// retrieving chunks. This getter embeds the original simple chunk getter and extends it
// to a multiplexed variant that fetches chunks with replicas.
//
// the strategy to retrieve a chunk that has replicas can be configured with a few parameters:
// - RetryInterval: the delay before a new batch of replicas is fetched.
// - depth: 2^{depth} is the total number of additional replicas that have been uploaded
// (by default, it is assumed to be 4, ie. total of 16)
// - (not implemented) pivot: replicas with address in the proximity of pivot will be tried first
type socGetter struct {
wg sync.WaitGroup
storage.Getter
level redundancy.Level
}

// NewGetter is the getter constructor
func NewSocGetter(g storage.Getter, level redundancy.Level) storage.Getter {
return &socGetter{Getter: g, level: level}
}

// Get makes the getter satisfy the storage.Getter interface
func (g *socGetter) Get(ctx context.Context, addr swarm.Address) (ch swarm.Chunk, err error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// channel that the results (retrieved chunks) are gathered to from concurrent
// workers each fetching a replica
resultC := make(chan swarm.Chunk)
// errc collects the errors
errc := make(chan error, 17)
var errs error
errcnt := 0

// concurrently call to retrieve chunk using original SOC address
g.wg.Add(1)
go func() {
defer g.wg.Done()
ch, err := g.Getter.Get(ctx, addr)
if err != nil {
errc <- err
return
}

select {
case resultC <- ch:
case <-ctx.Done():
}
}()
// counters
n := 0 // counts the replica addresses tried
target := 2 // the number of replicas attempted to download in this batch
total := g.level.GetReplicaCount()

//
rr := replicas_soc.NewSocReplicator(addr, g.level)
next := rr.C
var wait <-chan time.Time // nil channel to disable case
// addresses used are doubling each period of search expansion
// (at intervals of RetryInterval)
ticker := time.NewTicker(RetryInterval)
defer ticker.Stop()
for level := uint8(0); level <= uint8(g.level); {
select {
// at least one chunk is retrieved, cancel the rest and return early
case chunk := <-resultC:
cancel()
return chunk, nil

case err = <-errc:
errs = errors.Join(errs, err)
errcnt++
if errcnt > total {
return nil, errors.Join(ErrSwarmageddon, errs)
}

// ticker switches on the address channel
case <-wait:
wait = nil
next = rr.C
level++
target = 1 << level
n = 0
continue

// getting the addresses in order
case so := <-next:
if so == nil {
next = nil
continue
}

g.wg.Add(1)
go func() {
defer g.wg.Done()
ch, err := g.Getter.Get(ctx, swarm.NewAddress(so.Addr))
if err != nil {
errc <- err
return
}

soc, err := soc.FromChunk(ch)
if err != nil {
errc <- err
return
}

returnCh, err := soc.Chunk()
if err != nil {
errc <- err
return
}

select {
case resultC <- returnCh:
case <-ctx.Done():
}
}()
n++
if n < target {
continue
}
next = nil
wait = ticker.C
}
}

return nil, nil
}
Loading
Loading