Skip to content

Commit

Permalink
get sample by billy
Browse files Browse the repository at this point in the history
  • Loading branch information
syntrust committed Jul 26, 2024
1 parent 0f67978 commit ce0367c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 177 deletions.
11 changes: 4 additions & 7 deletions ethstorage/blobs/blob_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type BlobCacheReader interface {
GetKeyValueByIndex(index uint64, hash common.Hash) []byte
GetKeyValueByIndexUnchecked(index uint64) []byte
GetSampleData(kvIndex, sampleIndexInKv uint64) []byte
}

// BlobReader provides unified interface for the miner to read blobs and samples
Expand Down Expand Up @@ -53,12 +53,9 @@ func (n *BlobReader) GetBlob(kvIdx uint64, kvHash common.Hash) ([]byte, error) {
func (n *BlobReader) ReadSample(shardIdx, sampleIdx uint64) (common.Hash, error) {
sampleLenBits := n.sm.MaxKvSizeBits() - es.SampleSizeBits
kvIdx := sampleIdx >> sampleLenBits
// get blob without checking commit since kvHash is not available
if blob := n.cr.GetKeyValueByIndexUnchecked(kvIdx); blob != nil {
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)
sampleSize := uint64(1 << es.SampleSizeBits)
sampleIdxByte := sampleIdxInKv << es.SampleSizeBits
sample := blob[sampleIdxByte : sampleIdxByte+sampleSize]
sampleIdxInKv := sampleIdx % (1 << sampleLenBits)

if sample := n.cr.GetSampleData(kvIdx, sampleIdxInKv); sample != nil {
return common.BytesToHash(sample), nil
}

Expand Down
196 changes: 89 additions & 107 deletions ethstorage/downloader/blob_disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ package downloader

import (
"bytes"
"fmt"
"math/big"
"os"
"path/filepath"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethstorage/billy"
"github.com/ethstorage/go-ethstorage/ethstorage"
"github.com/holiman/billy"
)

const (
Expand All @@ -24,11 +23,24 @@ const (
blobCacheDir = "cached_blobs"
)

type blockBlobsCached struct {
timestamp uint64
number uint64
blobs []*blobCached
}

type blobCached struct {
kvIndex *big.Int
kvSize *big.Int
hash common.Hash
dataId uint64
}

type BlobDiskCache struct {
store billy.Database
lookup map[uint64]uint64 // Lookup table mapping block number to blob billy entries id
index map[uint64]uint64 // Lookup table mapping kvIndex to blob billy entries id
mu sync.RWMutex // protects lookup and index maps
lookup map[uint64]*blockBlobsCached // Lookup table mapping block number to blockBlob
index map[uint64]uint64 // Lookup table mapping kvIndex to blob billy entries id
mu sync.RWMutex // protects lookup and index maps
lg log.Logger
}

Expand All @@ -38,7 +50,7 @@ func NewBlobDiskCache(datadir string, lg log.Logger) *BlobDiskCache {
lg.Crit("Failed to create cache directory", "dir", cbdir, "err", err)
}
c := &BlobDiskCache{
lookup: make(map[uint64]uint64),
lookup: make(map[uint64]*blockBlobsCached),
index: make(map[uint64]uint64),
lg: lg,
}
Expand All @@ -54,163 +66,133 @@ func NewBlobDiskCache(datadir string, lg log.Logger) *BlobDiskCache {
}

func (c *BlobDiskCache) SetBlockBlobs(block *blockBlobs) error {
rlpBlock, err := rlp.EncodeToBytes(block)
if err != nil {
c.lg.Error("Failed to encode blockBlobs into RLP", "block", block.number, "err", err)
return err
}
id, err := c.store.Put(rlpBlock)
if err != nil {
c.lg.Error("Failed to write blockBlobs into storage", "block", block.number, "err", err)
return err
}
c.mu.Lock()
c.lookup[block.number] = id
defer c.mu.Unlock()

var blobIds []uint64
var bcs []*blobCached
for _, b := range block.blobs {
id, err := c.store.Put(b.data)
if err != nil {
c.lg.Error("Failed to write blockBlobs into storage", "block", block.number, "err", err)
return err
}
blobIds = append(blobIds, id)
c.index[b.kvIndex.Uint64()] = id
c.lg.Debug("Indexing blob in cache", "kvIdx", b.kvIndex, "hash", b.hash, "id", id)
}
c.mu.Unlock()
c.lg.Info("Set blockBlobs to cache", "block", block.number, "id", id)
bcs = append(bcs, &blobCached{
kvIndex: b.kvIndex,
kvSize: b.kvSize,
hash: b.hash,
dataId: id,
})
}
c.lookup[block.number] = &blockBlobsCached{
timestamp: block.timestamp,
number: block.number,
blobs: bcs,
}
c.lg.Info("Set blockBlobs to cache", "block", block.number)
return nil
}

func (c *BlobDiskCache) Blobs(number uint64) []blob {
c.mu.RLock()
id, ok := c.lookup[number]
bb, ok := c.lookup[number]
c.mu.RUnlock()
if !ok {
return nil
}
block, err := c.getBlockBlobsById(id)
if err != nil || block == nil {
return nil
}
c.lg.Info("Blobs from cache", "block", block.number, "id", id)
c.lg.Info("Blobs from cache", "block", bb.number)
res := []blob{}
for _, blob := range block.blobs {
res = append(res, *blob)
for _, bc := range bb.blobs {
data, err := c.store.Get(bc.dataId)
if err != nil {
c.lg.Error("Failed to get blockBlobs from storage", "block", number, "err", err)
return nil
}
res = append(res, blob{
kvIndex: bc.kvIndex,
kvSize: bc.kvSize,
hash: bc.hash,
data: data,
})
}
return res
}

func (c *BlobDiskCache) GetKeyValueByIndex(idx uint64, hash common.Hash) []byte {
blob := c.getBlobByIndex(idx)
if blob != nil &&
bytes.Equal(blob.hash[0:ethstorage.HashSizeInContract], hash[0:ethstorage.HashSizeInContract]) {
return blob.data
}
return nil
}

// Access without verification through a hash: only for miner sampling
func (c *BlobDiskCache) GetKeyValueByIndexUnchecked(idx uint64) []byte {
blob := c.getBlobByIndex(idx)
if blob != nil {
return blob.data
c.mu.RLock()
defer c.mu.RUnlock()

for _, bb := range c.lookup {
for _, b := range bb.blobs {
if b.kvIndex.Uint64() == idx &&
bytes.Equal(b.hash[0:ethstorage.HashSizeInContract], hash[0:ethstorage.HashSizeInContract]) {
data, err := c.store.Get(b.dataId)
if err != nil {
c.lg.Error("Failed to get kv from downloader cache", "kvIndex", idx, "id", b.dataId, "err", err)
return nil
}
return data
}
}
}
return nil
}

func (c *BlobDiskCache) getBlobByIndex(idx uint64) *blob {
func (c *BlobDiskCache) GetSampleData(idx, sampleIdx uint64) []byte {
c.mu.RLock()
id, ok := c.index[idx]
c.mu.RUnlock()
if !ok {
return nil
}
block, err := c.getBlockBlobsById(id)
if err != nil || block == nil {

off := sampleIdx << ethstorage.SampleSizeBits
size := uint64(1 << ethstorage.SampleSizeBits)
data, err := c.store.GetSample(id, off, size)
if err != nil {
return nil
}
for _, blob := range block.blobs {
if blob != nil && blob.kvIndex.Uint64() == idx {
return blob
}
}
return nil
return data
}

func (c *BlobDiskCache) Cleanup(finalized uint64) {
c.mu.Lock()
defer c.mu.Unlock()

for hash, id := range c.lookup {
block, err := c.getBlockBlobsById(id)
if err != nil {
c.lg.Error("Failed to get block from id", "id", id, "err", err)
continue
}
if block != nil && block.number <= finalized {
if err := c.store.Delete(id); err != nil {
c.lg.Error("Failed to delete block from id", "id", id, "err", err)
}
delete(c.lookup, hash)
for number, block := range c.lookup {
if number <= finalized {
delete(c.lookup, number)
for _, blob := range block.blobs {
if blob != nil && blob.kvIndex != nil {
if blob.kvIndex != nil {
delete(c.index, blob.kvIndex.Uint64())
}
if err := c.store.Delete(blob.dataId); err != nil {
c.lg.Error("Failed to delete block from id", "id", blob.dataId, "err", err)
}
}
c.lg.Info("Cleanup deleted", "finalized", finalized, "block", block.number, "id", id)
c.lg.Info("Cleanup deleted", "finalized", finalized, "block", block.number)
}
}
}

func (c *BlobDiskCache) getBlockBlobsById(id uint64) (*blockBlobs, error) {
data, err := c.store.Get(id)
if err != nil {
c.lg.Error("Failed to get block from id", "id", id, "err", err)
return nil, err
}
if len(data) == 0 {
c.lg.Warn("BlockBlobs not found", "id", id)
return nil, fmt.Errorf("not found: id=%d", id)
}
item := new(blockBlobs)
if err := rlp.DecodeBytes(data, item); err != nil {
c.lg.Error("Failed to decode block", "id", id, "err", err)
return nil, err
}
return item, nil
}

func (c *BlobDiskCache) Close() error {
c.lg.Warn("Closing BlobDiskCache")
c.mu.Lock()
defer c.mu.Unlock()
for _, id := range c.lookup {
for _, id := range c.index {
if err := c.store.Delete(id); err != nil {
c.lg.Warn("Failed to delete block from id", "id", id, "err", err)
c.lg.Warn("Failed to delete blob from id", "id", id, "err", err)
}
}
c.lookup = nil
c.index = nil
return c.store.Close()
}

// newSlotter creates a helper method for the Billy datastore that returns the
// individual shelf sizes used to store blobs in.

// | blobs | shelf size | data size|
// |--|--|--|
// | 1 | 131128 |131125|
// | 2 | 262244 |262242|
// | 3 | 393360 |393358|
// | 4 | 524476 |524472|
// | 5 | 655592 |655585|
// | 6 | 786708 |786701|

func newSlotter() func() (uint32, bool) {
var (
slotsize uint32 = 12
blobCount uint32 = 1
)

return func() (size uint32, done bool) {
slotsize += blobSize + 44
size = slotsize
done = blobCount == maxBlobsPerTransaction
blobCount++
return
return blobSize + 4, true
}
}
Loading

0 comments on commit ce0367c

Please sign in to comment.