Skip to content

Commit

Permalink
Merge branch 'main' into integratiointest
Browse files Browse the repository at this point in the history
  • Loading branch information
ping-ke authored Aug 7, 2024
2 parents 8b58af2 + 264bf03 commit f1ac853
Show file tree
Hide file tree
Showing 18 changed files with 726 additions and 64 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,4 @@ profile.cov

**/yarn-error.log
logs/
*.test
67 changes: 67 additions & 0 deletions ethstorage/blobs/blob_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2022-2023, es.
// For license information, see https://github.com/ethstorage/es-node/blob/main/LICENSE

package blobs

import (
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
es "github.com/ethstorage/go-ethstorage/ethstorage"
)

type BlobCacheReader interface {
GetKeyValueByIndex(index uint64, hash common.Hash) []byte
GetSampleData(kvIndex, sampleIndexInKv uint64) []byte
}

// BlobReader provides unified interface for the miner to read blobs and samples
// from StorageManager and downloader cache.
type BlobReader struct {
cr BlobCacheReader
sm *es.StorageManager
lg log.Logger
}

func NewBlobReader(cr BlobCacheReader, sm *es.StorageManager, lg log.Logger) *BlobReader {
return &BlobReader{
cr: cr,
sm: sm,
lg: lg,
}
}

func (n *BlobReader) GetBlob(kvIdx uint64, kvHash common.Hash) ([]byte, error) {
if blob := n.cr.GetKeyValueByIndex(kvIdx, kvHash); blob != nil {
n.lg.Debug("Loaded blob from downloader cache", "kvIdx", kvIdx)
blobDecoded := n.sm.DecodeBlob(blob, kvHash, kvIdx, n.sm.MaxKvSize())
return blobDecoded, nil
}

blob, exist, err := n.sm.TryRead(kvIdx, int(n.sm.MaxKvSize()), kvHash)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("kv not found: index=%d", kvIdx)
}
n.lg.Debug("Loaded blob from storage manager", "kvIdx", kvIdx)
return blob, nil
}

func (n *BlobReader) ReadSample(shardIdx, sampleIdx uint64) (common.Hash, error) {
sampleLenBits := n.sm.MaxKvSizeBits() - es.SampleSizeBits
kvIdx := sampleIdx >> sampleLenBits
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)

if sample := n.cr.GetSampleData(kvIdx, sampleIdxInKv); sample != nil {
return common.BytesToHash(sample), nil
}

sample, err := n.sm.ReadSampleUnlocked(shardIdx, sampleIdx)
if err != nil {
return common.Hash{}, err
}
return sample, nil
}
12 changes: 7 additions & 5 deletions ethstorage/data_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
VERSION = uint64(1)

HEADER_SIZE = 4096

SampleSizeBits = 5 // 32 bytes
)

// A DataFile represents a local file for a consecutive chunks
Expand Down Expand Up @@ -131,7 +133,7 @@ func (df *DataFile) ContainsKv(kvIdx uint64) bool {
}

func (df *DataFile) ContainsSample(sampleIdx uint64) bool {
return df.Contains(sampleIdx * 32 / df.chunkSize)
return df.Contains(sampleIdx << SampleSizeBits / df.chunkSize)
}

func (df *DataFile) ChunkIdxEnd() uint64 {
Expand Down Expand Up @@ -174,13 +176,13 @@ func (df *DataFile) ReadSample(sampleIdx uint64) (common.Hash, error) {
if !df.ContainsSample(sampleIdx) {
return common.Hash{}, fmt.Errorf("sample not found")
}

md := make([]byte, 32)
n, err := df.file.ReadAt(md, HEADER_SIZE+int64(sampleIdx*32)-int64(df.chunkIdxStart*df.chunkSize))
sampleSize := 1 << SampleSizeBits
md := make([]byte, sampleSize)
n, err := df.file.ReadAt(md, HEADER_SIZE+int64(sampleIdx<<SampleSizeBits)-int64(df.chunkIdxStart*df.chunkSize))
if err != nil {
return common.Hash{}, err
}
if n != 32 {
if n != sampleSize {
return common.Hash{}, fmt.Errorf("not full read")
}
return common.BytesToHash(md), nil
Expand Down
271 changes: 271 additions & 0 deletions ethstorage/downloader/blob_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Copyright 2022-2023, EthStorage.
// For license information, see https://github.com/ethstorage/es-node/blob/main/LICENSE

package downloader

import (
"bytes"
"fmt"
"math/big"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/ethstorage/go-ethstorage/ethstorage/log"
"github.com/protolambda/go-kzg/eth"
)

var (
cache BlobCache
kvHashes []common.Hash
datadir string
fileName = "test_shard_0.dat"
blobData = "blob data of kvIndex %d"
sampleLen = blobSize / sampleSize
minerAddr = common.BigToAddress(common.Big1)
kvSize uint64 = 1 << 17
kvEntries uint64 = 16
shardID = uint64(0)
)

func TestDiskBlobCache(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

block, err := newBlockBlobs(10, 4)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

blobs := cache.Blobs(block.number)
if len(blobs) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs: got %d, want %d", len(blobs), len(block.blobs))
}

for i, blob := range block.blobs {
blobData := cache.GetKeyValueByIndex(uint64(i), blob.hash)
if !bytes.Equal(blobData, blob.data) {
t.Fatalf("Unexpected blob data at index %d: got %x, want %x", i, blobData, blob.data)
}
}

cache.Cleanup(5)
blobsAfterCleanup := cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}

block, err = newBlockBlobs(20, 6)
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}

err = cache.SetBlockBlobs(block)
if err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}

cache.Cleanup(15)
blobsAfterCleanup = cache.Blobs(block.number)
if len(blobsAfterCleanup) != len(block.blobs) {
t.Fatalf("Unexpected number of blobs after cleanup: got %d, want %d", len(blobsAfterCleanup), len(block.blobs))
}
}

func TestEncoding(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

blockBlobsParams := []struct {
blockNum uint64
blobLen uint64
}{
{0, 1},
{1000, 4},
{1, 5},
{222, 6},
{12345, 2},
{2000000, 3},
}

df, err := ethstorage.Create(fileName, shardID, kvEntries, 0, kvSize, ethstorage.ENCODE_BLOB_POSEIDON, minerAddr, kvSize)
if err != nil {
t.Fatalf("Create failed %v", err)
}
shardMgr := ethstorage.NewShardManager(common.Address{}, kvSize, kvEntries, kvSize)
shardMgr.AddDataShard(shardID)
shardMgr.AddDataFile(df)
sm := ethstorage.NewStorageManager(shardMgr, nil)
defer func() {
sm.Close()
os.Remove(fileName)
}()

// download and save to cache
for _, tt := range blockBlobsParams {
bb, err := newBlockBlobs(tt.blockNum, tt.blobLen)
if err != nil {
t.Fatalf("failed to create block blobs: %v", err)
}
for i, b := range bb.blobs {
bb.blobs[i].data = sm.EncodeBlob(b.data, b.hash, b.kvIndex.Uint64(), kvSize)
}
if err := cache.SetBlockBlobs(bb); err != nil {
t.Fatalf("failed to set block blobs: %v", err)
}
}

// load from cache and verify
for i, kvHash := range kvHashes {
kvIndex := uint64(i)
t.Run(fmt.Sprintf("test kv: %d", i), func(t *testing.T) {
blobEncoded := cache.GetKeyValueByIndex(kvIndex, kvHash)
blobDecoded := sm.DecodeBlob(blobEncoded, kvHash, kvIndex, kvSize)
bytesWant := []byte(fmt.Sprintf(blobData, kvIndex))
if !bytes.Equal(blobDecoded[:len(bytesWant)], bytesWant) {
t.Errorf("GetKeyValueByIndex and decoded = %s, want %s", blobDecoded[:len(bytesWant)], bytesWant)
}
})
}
}

func TestBlobDiskCache_GetSampleData(t *testing.T) {
setup(t)
t.Cleanup(func() {
teardown(t)
})

const blockStart = 10000000
rand.New(rand.NewSource(time.Now().UnixNano()))
kvIndex2BlockNumber := map[uint64]uint64{}
kvIndex2BlobIndex := map[uint64]uint64{}

newBlockBlobsFilled := func(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
kvIndex := uint64(len(kvHashes))
blob := &blob{
kvIndex: new(big.Int).SetUint64(kvIndex),
data: fill(blockNumber, i),
}
kzgBlob := kzg4844.Blob{}
copy(kzgBlob[:], blob.data)
commitment, err := kzg4844.BlobToCommitment(kzgBlob)
if err != nil {
return nil, fmt.Errorf(
"failed to create commitment for blob %d: %w", kvIndex, err)
}
blob.hash = common.Hash(eth.KZGToVersionedHash(eth.KZGCommitment(commitment)))
block.blobs[i] = blob
kvHashes = append(kvHashes, blob.hash)
kvIndex2BlockNumber[kvIndex] = blockNumber
kvIndex2BlobIndex[kvIndex] = i
}
t.Log("Block created", "number", block.number, "blobs", blobLen)
return block, nil
}
for i := 0; i < 10; i++ {
blockn, blobn := blockStart+i, rand.Intn(6)+1
block, err := newBlockBlobsFilled(uint64(blockn), uint64(blobn))
if err != nil {
t.Fatalf("Failed to create new block blobs: %v", err)
}
if err := cache.SetBlockBlobs(block); err != nil {
t.Fatalf("Failed to set block blobs: %v", err)
}
}

for kvi := range kvHashes {
kvIndex := uint64(kvi)
sampleIndex := rand.Intn(int(sampleLen))
sample := cache.GetSampleData(kvIndex, uint64(sampleIndex))
sampleWant := make([]byte, sampleSize)
copy(sampleWant, fmt.Sprintf("%d_%d_%d", kvIndex2BlockNumber[kvIndex], kvIndex2BlobIndex[kvIndex], sampleIndex))
t.Run(fmt.Sprintf("test sample: kvIndex=%d, sampleIndex=%d", kvIndex, sampleIndex), func(t *testing.T) {
if !bytes.Equal(sample, sampleWant) {
t.Errorf("GetSampleData got %x, want %x", sample, sampleWant)
}
})
}

}

func fill(blockNumber, blobIndex uint64) []byte {
var content []byte
for i := uint64(0); i < sampleLen; i++ {
sample := make([]byte, sampleSize)
copy(sample, fmt.Sprintf("%d_%d_%d", blockNumber, blobIndex, i))
content = append(content, sample...)
}
return content
}

func newBlockBlobs(blockNumber, blobLen uint64) (*blockBlobs, error) {
block := &blockBlobs{
number: blockNumber,
blobs: make([]*blob, blobLen),
}
for i := uint64(0); i < blobLen; i++ {
kvIndex := len(kvHashes)
kvIdx := big.NewInt(int64(kvIndex))
blob := &blob{
kvIndex: kvIdx,
data: []byte(fmt.Sprintf(blobData, kvIndex)),
}
kzgBlob := kzg4844.Blob{}
copy(kzgBlob[:], blob.data)
commitment, err := kzg4844.BlobToCommitment(kzgBlob)
if err != nil {
return nil, fmt.Errorf(
"failed to create commitment for blob %d: %w", kvIndex, err)
}
blob.hash = common.Hash(eth.KZGToVersionedHash(eth.KZGCommitment(commitment)))
block.blobs[i] = blob
kvHashes = append(kvHashes, blob.hash)
}
return block, nil
}

func setup(t *testing.T) {
// cache = NewBlobMemCache()
tmpDir := t.TempDir()
datadir = filepath.Join(tmpDir, "datadir")
err := os.MkdirAll(datadir, 0700)
if err != nil {
t.Fatalf("Failed to create datadir: %v", err)
}
t.Logf("datadir %s", datadir)
cache = NewBlobDiskCache(datadir, log.NewLogger(log.CLIConfig{
Level: "warn",
Format: "text",
}))
}

func teardown(t *testing.T) {
err := cache.Close()
if err != nil {
t.Errorf("Failed to close BlobCache: %v", err)
}
err = os.RemoveAll(datadir)
if err != nil {
t.Errorf("Failed to remove datadir: %v", err)
}
kvHashes = nil
}
Loading

0 comments on commit f1ac853

Please sign in to comment.