Skip to content

Commit

Permalink
parallelize length proof verification
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Dec 12, 2023
1 parent 6f84c3e commit 9119b43
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 14 deletions.
5 changes: 3 additions & 2 deletions core/mock/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"errors"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/stretchr/testify/mock"
)
Expand All @@ -23,8 +24,8 @@ func NewMockChunkValidator() *MockChunkValidator {
return &MockChunkValidator{}
}

func (v *MockChunkValidator) ValidateBatch(blobs []*core.BlobMessage, operatorState *core.OperatorState) error {
args := v.Called(blobs, operatorState)
func (v *MockChunkValidator) ValidateBatch(blobs []*core.BlobMessage, operatorState *core.OperatorState, pool common.WorkerPool) error {
args := v.Called(blobs, operatorState, pool)
return args.Error(0)
}

Expand Down
11 changes: 8 additions & 3 deletions core/test/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"runtime"
"testing"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/encoding"
"github.com/Layr-Labs/eigenda/core/mock"
"github.com/gammazero/workerpool"
"github.com/stretchr/testify/assert"

"github.com/Layr-Labs/eigenda/pkg/encoding/kzgEncoder"
Expand Down Expand Up @@ -171,7 +173,7 @@ func checkBatch(t *testing.T, cst core.IndexedChainState, encodedBlob core.Encod

// checkBatchByUniversalVerifier runs the verification logic for each DA node in the current OperatorState, and returns an error if any of
// the DA nodes' validation checks fails
func checkBatchByUniversalVerifier(t *testing.T, cst core.IndexedChainState, encodedBlobs []core.EncodedBlob, header core.BatchHeader) {
func checkBatchByUniversalVerifier(t *testing.T, cst core.IndexedChainState, encodedBlobs []core.EncodedBlob, header core.BatchHeader, pool common.WorkerPool) {
val := core.NewChunkValidator(enc, asn, cst, [32]byte{})

quorums := []core.QuorumID{0}
Expand All @@ -184,7 +186,8 @@ func checkBatchByUniversalVerifier(t *testing.T, cst core.IndexedChainState, enc
for z, encodedBlob := range encodedBlobs {
blobMessages[z] = encodedBlob[id]
}
err := val.ValidateBatch(blobMessages, state.OperatorState)

err := val.ValidateBatch(blobMessages, state.OperatorState, pool)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -213,6 +216,8 @@ func TestCoreLibrary(t *testing.T) {
quorumIndex := uint(0)
bn := uint(0)

pool := workerpool.New(1)

for _, operatorCount := range operatorCounts {
cst, err := mock.NewChainDataMock(core.OperatorIndex(operatorCount))
assert.NoError(t, err)
Expand Down Expand Up @@ -245,7 +250,7 @@ func TestCoreLibrary(t *testing.T) {

}
t.Run(fmt.Sprintf("universal verifier operatorCount=%v over %v blobs", operatorCount, len(batches)), func(t *testing.T) {
checkBatchByUniversalVerifier(t, cst, batches, batchHeader)
checkBatchByUniversalVerifier(t, cst, batches, batchHeader, pool)
})

}
Expand Down
40 changes: 32 additions & 8 deletions core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package core

import (
"errors"

"github.com/Layr-Labs/eigenda/common"
)

var (
Expand All @@ -11,7 +13,7 @@ var (
)

type ChunkValidator interface {
ValidateBatch([]*BlobMessage, *OperatorState) error
ValidateBatch([]*BlobMessage, *OperatorState, common.WorkerPool) error
ValidateBlob(*BlobMessage, *OperatorState) error
UpdateOperatorID(OperatorID)
}
Expand Down Expand Up @@ -128,9 +130,9 @@ func (v *chunkValidator) UpdateOperatorID(operatorID OperatorID) {
v.operatorID = operatorID
}

func (v *chunkValidator) ValidateBatch(blobs []*BlobMessage, operatorState *OperatorState) error {
func (v *chunkValidator) ValidateBatch(blobs []*BlobMessage, operatorState *OperatorState, pool common.WorkerPool) error {
subBatchMap := make(map[EncodingParams]*SubBatch)
blobCommitments := make([]BlobCommitments, len(blobs))
blobCommitmentList := make([]BlobCommitments, len(blobs))

for k, blob := range blobs {
if len(blob.Bundles) != len(blob.BlobHeader.QuorumInfos) {
Expand All @@ -143,7 +145,7 @@ func (v *chunkValidator) ValidateBatch(blobs []*BlobMessage, operatorState *Oper
// return err
//}

blobCommitments[k] = blob.BlobHeader.BlobCommitments
blobCommitmentList[k] = blob.BlobHeader.BlobCommitments

// for each quorum
for _, quorumHeader := range blob.BlobHeader.QuorumInfos {
Expand Down Expand Up @@ -188,16 +190,28 @@ func (v *chunkValidator) ValidateBatch(blobs []*BlobMessage, operatorState *Oper

// Parallelize the universal verification for each subBatch
numSubBatch := len(subBatchMap)
out := make(chan error, numSubBatch)
numResult := numSubBatch + len(blobCommitmentList)
// create a channel to accept results, we don't use stop
out := make(chan error, numResult)

// parallelize subBatch verification
for params, subBatch := range subBatchMap {
params := params
subBatch := subBatch
go v.universalVerifyWorker(params, subBatch, out)
pool.Submit(func() {
v.universalVerifyWorker(params, subBatch, out)
})
}


// parallelize length proof verification
for _, blobCommitments := range blobCommitmentList {
blobCommitments := blobCommitments
pool.Submit(func() {
v.VerifyBlobLengthWorker(blobCommitments, out)
})
}

for i := 0; i < numSubBatch; i++ {
for i := 0; i < numResult; i++ {
err := <-out
if err != nil {
return err
Expand All @@ -217,3 +231,13 @@ func (v *chunkValidator) universalVerifyWorker(params EncodingParams, subBatch *

out <- nil
}

func (v *chunkValidator) VerifyBlobLengthWorker(blobCommitments BlobCommitments, out chan error) {
err := v.encoder.VerifyBlobLength(blobCommitments)
if err != nil {
out <- err
return
}

out <- nil
}
7 changes: 6 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/Layr-Labs/eigensdk-go/nodeapi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gammazero/workerpool"
)

const (
Expand All @@ -44,6 +45,7 @@ type Node struct {
Transactor core.Transactor
PubIPProvider pubip.Provider
OperatorSocketsFilterer indexer.OperatorSocketsFilterer
Pool common.WorkerPool

mu sync.Mutex
CurrentSocket string
Expand All @@ -67,6 +69,8 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger common.Logger)

config.ID = keyPair.GetPubKeyG1().GetOperatorID()

pool := workerpool.New(config.NumBatchValidators)

// Make sure config folder exists.
err = os.MkdirAll(config.DbPath, os.ModePerm)
if err != nil {
Expand Down Expand Up @@ -143,6 +147,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger common.Logger)
Validator: validator,
PubIPProvider: pubIPProvider,
OperatorSocketsFilterer: socketsFilterer,
Pool: pool,
}, nil
}

Expand Down Expand Up @@ -321,7 +326,7 @@ func (n *Node) ValidateBatch(ctx context.Context, header *core.BatchHeader, blob
return err
}

return n.Validator.ValidateBatch(blobs, operatorState)
return n.Validator.ValidateBatch(blobs, operatorState, n.Pool)
}

func (n *Node) updateSocketAddress(ctx context.Context, newSocketAddr string) {
Expand Down

0 comments on commit 9119b43

Please sign in to comment.