Skip to content

Commit

Permalink
perf: sampler optimization (#4882)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 31, 2024
1 parent 78bea5c commit 336ff77
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 68 deletions.
16 changes: 16 additions & 0 deletions pkg/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ func (s *SOC) Sign(signer crypto.Signer) (swarm.Chunk, error) {
return s.Chunk()
}

// UnwrapCAC extracts the CAC inside the SOC.
func UnwrapCAC(sch swarm.Chunk) (swarm.Chunk, error) {
chunkData := sch.Data()
if len(chunkData) < swarm.SocMinChunkSize {
return nil, errWrongChunkSize
}

cursor := swarm.HashSize + swarm.SocSignatureSize
ch, err := cac.NewWithDataSpan(chunkData[cursor:])
if err != nil {
return nil, err
}

return ch, nil
}

// FromChunk recreates a SOC representation from swarm.Chunk data.
func FromChunk(sch swarm.Chunk) (*SOC, error) {
chunkData := sch.Data()
Expand Down
9 changes: 9 additions & 0 deletions pkg/soc/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ func TestFromChunk(t *testing.T) {
if !ch.Equal(recoveredSOC.WrappedChunk()) {
t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address())
}

unwrapped, err := soc.UnwrapCAC(sch)
if err != nil {
t.Fatal(err)
}

if !ch.Equal(unwrapped) {
t.Fatalf("wrapped chunk mismatch. got %s want %s", recoveredSOC.WrappedChunk().Address(), ch.Address())
}
}

func TestCreateAddress(t *testing.T) {
Expand Down
142 changes: 74 additions & 68 deletions pkg/storer/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"hash"
"math/big"
"runtime"
"sort"
"sync"
"testing"
Expand Down Expand Up @@ -41,67 +42,6 @@ type Sample struct {
Items []SampleItem
}

// RandSample returns Sample with random values.
func RandSample(t *testing.T, anchor []byte) Sample {
t.Helper()

chunks := make([]swarm.Chunk, SampleSize)
for i := 0; i < SampleSize; i++ {
ch := chunk.GenerateTestRandomChunk()
if i%3 == 0 {
ch = chunk.GenerateTestRandomSoChunk(t, ch)
}
chunks[i] = ch
}

sample, err := MakeSampleUsingChunks(chunks, anchor)
if err != nil {
t.Fatal(err)
}

return sample
}

// MakeSampleUsingChunks returns Sample constructed using supplied chunks.
func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) {
prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}
items := make([]SampleItem, len(chunks))
for i, ch := range chunks {
tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch))
if err != nil {
return Sample{}, err
}

items[i] = SampleItem{
TransformedAddress: tr,
ChunkAddress: ch.Address(),
ChunkData: ch.Data(),
Stamp: newStamp(ch.Stamp()),
}
}

sort.Slice(items, func(i, j int) bool {
return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1
})

return Sample{Items: items}, nil
}

func newStamp(s swarm.Stamp) *postage.Stamp {
return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig())
}

func getChunkType(chunk swarm.Chunk) swarm.ChunkType {
if cac.Valid(chunk) {
return swarm.ChunkTypeContentAddressed
} else if soc.Valid(chunk) {
return swarm.ChunkTypeSingleOwner
}
return swarm.ChunkTypeUnspecified
}

// ReserveSample generates the sample of reserve storage of a node required for the
// storage incentives agent to participate in the lottery round. In order to generate
// this sample we need to iterate through all the chunks in the node's reserve and
Expand All @@ -125,8 +65,9 @@ func (db *DB) ReserveSample(
consensusTime uint64,
minBatchBalance *big.Int,
) (Sample, error) {

g, ctx := errgroup.WithContext(ctx)
chunkC := make(chan *reserve.ChunkBinItem, 64)

allStats := &SampleStats{}
statsLock := sync.Mutex{}
addStats := func(stats SampleStats) {
Expand All @@ -144,6 +85,8 @@ func (db *DB) ReserveSample(

allStats.BatchesBelowValueDuration = time.Since(t)

chunkC := make(chan *reserve.ChunkBinItem)

// Phase 1: Iterate chunk addresses
g.Go(func() error {
start := time.Now()
Expand All @@ -170,13 +113,14 @@ func (db *DB) ReserveSample(
})

// Phase 2: Get the chunk data and calculate transformed hash
sampleItemChan := make(chan SampleItem, 64)
sampleItemChan := make(chan SampleItem)

prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}

const workers = 6
workers := max(4, runtime.NumCPU())
db.logger.Debug("reserve sampler workers", "count", workers)

for i := 0; i < workers; i++ {
g.Go(func() error {
Expand Down Expand Up @@ -241,6 +185,7 @@ func (db *DB) ReserveSample(
}()

sampleItems := make([]SampleItem, 0, SampleSize)

// insert function will insert the new item in its correct place. If the sample
// size goes beyond what we need we omit the last item.
insert := func(item SampleItem) {
Expand Down Expand Up @@ -376,20 +321,20 @@ func transformedAddressCAC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address
return swarm.NewAddress(taddr), nil
}

func transformedAddressSOC(hasher *bmt.Hasher, chunk swarm.Chunk) (swarm.Address, error) {
func transformedAddressSOC(hasher *bmt.Hasher, socChunk swarm.Chunk) (swarm.Address, error) {
// Calculate transformed address from wrapped chunk
sChunk, err := soc.FromChunk(chunk)
cacChunk, err := soc.UnwrapCAC(socChunk)
if err != nil {
return swarm.ZeroAddress, err
}
taddrCac, err := transformedAddressCAC(hasher, sChunk.WrappedChunk())
taddrCac, err := transformedAddressCAC(hasher, cacChunk)
if err != nil {
return swarm.ZeroAddress, err
}

// Hash address and transformed address to make transformed address for this SOC
sHasher := swarm.NewHasher()
if _, err := sHasher.Write(chunk.Address().Bytes()); err != nil {
if _, err := sHasher.Write(socChunk.Address().Bytes()); err != nil {
return swarm.ZeroAddress, err
}
if _, err := sHasher.Write(taddrCac.Bytes()); err != nil {
Expand Down Expand Up @@ -432,3 +377,64 @@ func (s *SampleStats) add(other SampleStats) {
s.ChunkLoadFailed += other.ChunkLoadFailed
s.StampLoadFailed += other.StampLoadFailed
}

// RandSample returns Sample with random values.
func RandSample(t *testing.T, anchor []byte) Sample {
t.Helper()

chunks := make([]swarm.Chunk, SampleSize)
for i := 0; i < SampleSize; i++ {
ch := chunk.GenerateTestRandomChunk()
if i%3 == 0 {
ch = chunk.GenerateTestRandomSoChunk(t, ch)
}
chunks[i] = ch
}

sample, err := MakeSampleUsingChunks(chunks, anchor)
if err != nil {
t.Fatal(err)
}

return sample
}

// MakeSampleUsingChunks returns Sample constructed using supplied chunks.
func MakeSampleUsingChunks(chunks []swarm.Chunk, anchor []byte) (Sample, error) {
prefixHasherFactory := func() hash.Hash {
return swarm.NewPrefixHasher(anchor)
}
items := make([]SampleItem, len(chunks))
for i, ch := range chunks {
tr, err := transformedAddress(bmt.NewHasher(prefixHasherFactory), ch, getChunkType(ch))
if err != nil {
return Sample{}, err
}

items[i] = SampleItem{
TransformedAddress: tr,
ChunkAddress: ch.Address(),
ChunkData: ch.Data(),
Stamp: newStamp(ch.Stamp()),
}
}

sort.Slice(items, func(i, j int) bool {
return items[i].TransformedAddress.Compare(items[j].TransformedAddress) == -1
})

return Sample{Items: items}, nil
}

func newStamp(s swarm.Stamp) *postage.Stamp {
return postage.NewStamp(s.BatchID(), s.Index(), s.Timestamp(), s.Sig())
}

func getChunkType(chunk swarm.Chunk) swarm.ChunkType {
if cac.Valid(chunk) {
return swarm.ChunkTypeContentAddressed
} else if soc.Valid(chunk) {
return swarm.ChunkTypeSingleOwner
}
return swarm.ChunkTypeUnspecified
}

0 comments on commit 336ff77

Please sign in to comment.