Skip to content

Commit

Permalink
Added blob table data structure. (Layr-Labs#677)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Aug 6, 2024
1 parent 7f466d6 commit 7020083
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 0 deletions.
44 changes: 44 additions & 0 deletions tools/traffic/table/blob_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package table

import "errors"

// BlobMetadata encapsulates various information about a blob written by the traffic generator.
type BlobMetadata struct {
// Key of the blob, set when the blob is initially uploaded.
Key []byte

// BlobIndex of the blob.
BlobIndex uint

// Checksum of the blob.
Checksum [16]byte

// Size of the blob, in bytes.
Size uint

// RemainingReadPermits describes the maximum number of remaining reads permitted against this blob.
// If -1 then an unlimited number of reads are permitted.
RemainingReadPermits int
}

// NewBlobMetadata creates a new BlobMetadata instance. The readPermits parameter describes the maximum number of
// remaining reads permitted against this blob. If -1 then an unlimited number of reads are permitted.
func NewBlobMetadata(
key []byte,
checksum [16]byte,
size uint,
blobIndex uint,
readPermits int) (*BlobMetadata, error) {

if readPermits == 0 {
return nil, errors.New("read permits must not be zero")
}

return &BlobMetadata{
Key: key,
Checksum: checksum,
Size: size,
BlobIndex: blobIndex,
RemainingReadPermits: readPermits,
}, nil
}
62 changes: 62 additions & 0 deletions tools/traffic/table/blob_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package table

import "sync"

// BlobStore is a thread safe data structure that tracks blobs written by the traffic generator.
type BlobStore struct {

// blobs contains all blobs currently tracked by the store.
blobs map[uint64]*BlobMetadata

// nextKey describes the next key to used for the blobs map.
nextKey uint64

lock sync.Mutex
}

// NewBlobStore creates a new BlobStore instance.
func NewBlobStore() *BlobStore {
return &BlobStore{
blobs: make(map[uint64]*BlobMetadata),
nextKey: 0,
}
}

// Add a blob to the store.
func (store *BlobStore) Add(blob *BlobMetadata) {
store.lock.Lock()
defer store.lock.Unlock()

store.blobs[store.nextKey] = blob
store.nextKey++
}

// GetNext returns the next blob in the store. Decrements the blob's read permits, removing it
// from the store if the permits are exhausted. This method makes no guarantees about the order
// in which blobs are returned. Returns nil if no blobs are available.
func (store *BlobStore) GetNext() *BlobMetadata {
store.lock.Lock()
defer store.lock.Unlock()

for key, blob := range store.blobs {
// Always return the first blob found.

if blob.RemainingReadPermits > 0 {
blob.RemainingReadPermits--
if blob.RemainingReadPermits == 0 {
delete(store.blobs, key)
}
}

return blob
}
return nil
}

// Size returns the number of blobs currently stored.
func (store *BlobStore) Size() uint {
store.lock.Lock()
defer store.lock.Unlock()

return uint(len(store.blobs))
}
114 changes: 114 additions & 0 deletions tools/traffic/table/blob_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package table

import (
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/rand"
"testing"
)

// randomMetadata generates a random BlobMetadata instance.
func randomMetadata(t *testing.T, permits int) *BlobMetadata {
key := make([]byte, 32)
checksum := [16]byte{}
_, _ = rand.Read(key)
_, _ = rand.Read(checksum[:])
metadata, err := NewBlobMetadata(key, checksum, 1024, 0, permits)
assert.Nil(t, err)

return metadata
}

// TestBasicOperation tests basic operations of the BlobTable. Adds blobs and iterates over them.
func TestBasicOperation(t *testing.T) {
tu.InitializeRandom()

store := NewBlobStore()
assert.Equal(t, uint(0), store.Size())

size := 1024
expectedMetadata := make([]*BlobMetadata, 0)
for i := 0; i < size; i++ {
metadata := randomMetadata(t, 1)
store.Add(metadata)
expectedMetadata = append(expectedMetadata, metadata)
assert.Equal(t, uint(i+1), store.Size())
}

for i := 0; i < size; i++ {
assert.Equal(t, expectedMetadata[i], store.blobs[uint64(i)])
}
}

// TestGetRandomWithRemoval tests getting a random blob data, but where the number of permits per blob is unlimited.
func TestGetRandomNoRemoval(t *testing.T) {
tu.InitializeRandom()

table := NewBlobStore()
assert.Equal(t, uint(0), table.Size())

// Requesting a random element from an empty table should return nil.
element := table.GetNext()
assert.Nil(t, element)

expectedMetadata := make(map[string]*BlobMetadata)
size := 128
for i := 0; i < size; i++ {
metadata := randomMetadata(t, -1) // -1 == unlimited permits
table.Add(metadata)
expectedMetadata[string(metadata.Key)] = metadata
assert.Equal(t, uint(i+1), table.Size())
}

randomIndices := make(map[string]bool)

// Query more times than the number of blobs to ensure that blobs are not removed.
for i := 0; i < size*8; i++ {
metadata := table.GetNext()
assert.NotNil(t, metadata)
assert.Equal(t, expectedMetadata[string(metadata.Key)], metadata)
randomIndices[string(metadata.Key)] = true
}

// Sanity check: ensure that at least 10 different blobs were returned. This check is attempting to verify
// that we are actually getting random blobs. The probability of this check failing is extremely low if
// the random number generator is working correctly.
assert.GreaterOrEqual(t, len(randomIndices), 10)
}

// TestGetRandomWithRemoval tests getting a random blob data, where the number of permits per blob is limited.
func TestGetRandomWithRemoval(t *testing.T) {
tu.InitializeRandom()

table := NewBlobStore()
assert.Equal(t, uint(0), table.Size())

// Requesting a random element from an empty table should return nil.
element := table.GetNext()
assert.Nil(t, element)

permitCount := 2

size := 1024
expectedMetadata := make(map[string]uint)
for i := 0; i < size; i++ {
metadata := randomMetadata(t, permitCount)
table.Add(metadata)
expectedMetadata[string(metadata.Key)] = 0
assert.Equal(t, uint(i+1), table.Size())
}

// Requesting elements a number of times equal to the size times the number of permits should completely
// drain the table and return all elements a number of times equal to the number of permits.
for i := 0; i < size*permitCount; i++ {
metadata := table.GetNext()
assert.NotNil(t, metadata)

k := string(metadata.Key)
permitsUsed := expectedMetadata[k] + 1
expectedMetadata[k] = permitsUsed
assert.LessOrEqual(t, permitsUsed, uint(permitCount))
}

assert.Equal(t, uint(0), table.Size())
}

0 comments on commit 7020083

Please sign in to comment.