Skip to content

Commit

Permalink
Misc node code fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Mar 19, 2024
1 parent be2f29d commit 62de390
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 30 deletions.
15 changes: 0 additions & 15 deletions node/db.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package node

import (
"encoding/binary"

"github.com/syndtr/goleveldb/leveldb/iterator"
)

Expand All @@ -15,16 +13,3 @@ type DB interface {
WriteBatch(keys, values [][]byte) error
NewIterator(prefix []byte) iterator.Iterator
}

// ToByteArray converts an uint64 into byte array in big endian.
func ToByteArray(i uint64) []byte {
arr := make([]byte, 8)
binary.BigEndian.PutUint64(arr[0:8], uint64(i))
return arr
}

// ToUint64 converts a byte array into an uint64, assuming big endian.
func ToUint64(arr []byte) uint64 {
i := binary.BigEndian.Uint64(arr)
return i
}
8 changes: 4 additions & 4 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques
var batchHeaderHash [32]byte
copy(batchHeaderHash[:], in.GetBatchHeaderHash())

blobHeader, _, err := s.getBlobHeader(ctx, batchHeaderHash, int(in.BlobIndex), uint8(in.GetQuorumId()))
blobHeader, _, err := s.getBlobHeader(ctx, batchHeaderHash, int(in.GetBlobIndex()))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest)
var batchHeaderHash [32]byte
copy(batchHeaderHash[:], in.GetBatchHeaderHash())

blobHeader, protoBlobHeader, err := s.getBlobHeader(ctx, batchHeaderHash, int(in.BlobIndex), uint8(in.GetQuorumId()))
blobHeader, protoBlobHeader, err := s.getBlobHeader(ctx, batchHeaderHash, int(in.GetBlobIndex()))
if err != nil {
return nil, err
}
Expand All @@ -234,7 +234,7 @@ func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest)
return nil, err
}

tree, err := s.rebuildMerkleTree(batchHeaderHash, uint8(in.GetQuorumId()))
tree, err := s.rebuildMerkleTree(batchHeaderHash)
if err != nil {
return nil, err
}
Expand All @@ -253,7 +253,7 @@ func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest)
}, nil
}

func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumId uint8) (*core.BlobHeader, *pb.BlobHeader, error) {
func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blobIndex int) (*core.BlobHeader, *pb.BlobHeader, error) {

blobHeaderBytes, err := s.node.Store.GetBlobHeader(ctx, batchHeaderHash, blobIndex)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions node/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (

// Constructs a core.BatchHeader from a proto of pb.StoreChunksRequest.
func GetBatchHeader(in *pb.StoreChunksRequest) (*core.BatchHeader, error) {
if in.GetBatchHeader() == nil {
return nil, errors.New("the batch_header field is nil")
}
var batchRoot [32]byte
copy(batchRoot[:], in.GetBatchHeader().GetBatchRoot())
batchHeader := core.BatchHeader{
Expand All @@ -40,15 +43,15 @@ func GetBlobMessages(in *pb.StoreChunksRequest) ([]*core.BlobMessage, error) {
}

bundles := make(map[core.QuorumID]core.Bundle, len(blob.GetBundles()))
for i, chunks := range blob.GetBundles() {
quorumID := blob.GetHeader().GetQuorumHeaders()[i].QuorumId
for j, chunks := range blob.GetBundles() {
quorumID := blob.GetHeader().GetQuorumHeaders()[j].QuorumId
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(chunks.GetChunks()))
for j, data := range chunks.GetChunks() {
for k, data := range chunks.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
if err != nil {
return nil, err
}
bundles[uint8(quorumID)][j] = chunk
bundles[uint8(quorumID)][k] = chunk
}
}

Expand Down Expand Up @@ -108,7 +111,7 @@ func GetBlobHeaderFromProto(h *pb.BlobHeader) (*core.BlobHeader, error) {
}

// rebuildMerkleTree rebuilds the merkle tree from the blob headers and batch header.
func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte, quorumID uint8) (*merkletree.MerkleTree, error) {
func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.MerkleTree, error) {
batchHeaderBytes, err := s.node.Store.GetBatchHeader(context.Background(), batchHeaderHash)
if err != nil {
return nil, errors.New("failed to get the batch header from Store")
Expand Down
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
// revert all the keys for that batch.
result := <-storeChan
if result.keys != nil {
if !n.Store.DeleteKeys(ctx, result.keys) {
log.Error("Failed to delete the invalid batch that should be rolled back", "batchHeaderHash", batchHeaderHash)
if err = n.Store.DeleteKeys(ctx, result.keys); err != nil {
log.Error("Failed to delete the invalid batch that should be rolled back", "batchHeaderHash", batchHeaderHash, "err", err)
}
}
return nil, fmt.Errorf("failed to validate batch: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion node/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl st
defer cancel()

request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 1)

return gc.Churn(ctx, request, opt)
}
Expand Down
6 changes: 3 additions & 3 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (s *Store) GetBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blo
}

// GetChunks returns the list of byte arrays stored for given blobKey along with a boolean
// indicating if the read was usuccessful or the chunks were serialized correctly
// indicating if the read was unsuccessful or the chunks were serialized correctly
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, bool) {
log := s.logger

Expand Down Expand Up @@ -332,8 +332,8 @@ func (s *Store) HasKey(ctx context.Context, key []byte) bool {
//
// Note: caller should ensure these keys are exactly all the data items for a single batch
// to maintain the integrity of the store.
func (s *Store) DeleteKeys(ctx context.Context, keys *[][]byte) bool {
return s.db.DeleteBatch(*keys) == nil
func (s *Store) DeleteKeys(ctx context.Context, keys *[][]byte) error {
return s.db.DeleteBatch(*keys)
}

// Flattens an array of byte arrays (chunks) into a single byte array
Expand Down

0 comments on commit 62de390

Please sign in to comment.