Skip to content

Commit

Permalink
[7/N] Chunk encoding optimization: Node support for mixed encoding (L…
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Aug 5, 2024
1 parent 2566c21 commit 7f466d6
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 41 deletions.
5 changes: 3 additions & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ func (c client) GetChunks(
case node.ChunkEncoding_GOB:
chunk, err = new(encoding.Frame).Deserialize(data)
case node.ChunkEncoding_UNKNOWN:
// For backward compatibility, we fallback the UNKNOWN to GNARK
chunk, err = new(encoding.Frame).DeserializeGnark(data)
// For backward compatibility, we fallback the UNKNOWN to GOB
chunk, err = new(encoding.Frame).Deserialize(data)
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: errors.New("UNKNOWN chunk encoding format"),
Chunks: nil,
}
return
}
}
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ const (

// The list of supported encoding formats for bundle.
// Values must be in range [0, 255].
// Note that the IDs here may not be the same as the ChunkEncodingFormat enum in
// the node.proto file. For example, GobBundleEncodingFormat is 0 here, but in
// ChunkEncodingFormat the GOB is 2 (and UNKNOWN is 0). The reason is because
// we need to set GobBundleEncodingFormat to 0 for backward compatibility (and
// in protobuf, UNKNOWN as 0 is a convention).
GobBundleEncodingFormat BundleEncodingFormat = 0
GnarkBundleEncodingFormat BundleEncodingFormat = 1
)
Expand Down
2 changes: 2 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Config struct {
DataApiUrl string
NumBatchValidators int
NumBatchDeserializationWorkers int
EnableGnarkBundleEncoding bool
ClientIPHeader string
UseSecureGrpc bool
ReachabilityPollIntervalSec uint64
Expand Down Expand Up @@ -200,6 +201,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
DataApiUrl: ctx.GlobalString(flags.DataApiUrlFlag.Name),
NumBatchValidators: ctx.GlobalInt(flags.NumBatchValidatorsFlag.Name),
NumBatchDeserializationWorkers: ctx.GlobalInt(flags.NumBatchDeserializationWorkersFlag.Name),
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
ClientIPHeader: ctx.GlobalString(flags.ClientIPHeaderFlag.Name),
UseSecureGrpc: ctx.GlobalBoolT(flags.ChurnerUseSecureGRPC.Name),
DisableNodeInfoResources: ctx.GlobalBool(flags.DisableNodeInfoResourcesFlag.Name),
Expand Down
7 changes: 7 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ var (
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "NUM_BATCH_DESERIALIZATION_WORKERS"),
Value: 128,
}
EnableGnarkBundleEncodingFlag = cli.BoolFlag{
Name: "enable-gnark-bundle-encoding",
Usage: "Enable Gnark bundle encoding for chunks",
Required: false,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"),
}

// Test only, DO NOT USE the following flags in production

Expand Down Expand Up @@ -307,6 +313,7 @@ var optionalFlags = []cli.Flag{
EcdsaKeyPasswordFlag,
DataApiUrlFlag,
DisableNodeInfoResourcesFlag,
EnableGnarkBundleEncodingFlag,
}

func init() {
Expand Down
20 changes: 20 additions & 0 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,26 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start))
return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", hex.EncodeToString(batchHeaderHash[:]), in.GetBlobIndex(), in.GetQuorumId())
}
if !s.config.EnableGnarkBundleEncoding && format == pb.ChunkEncoding_GNARK {
format = pb.ChunkEncoding_GOB
gobChunks := make([][]byte, 0, len(chunks))
for _, c := range chunks {
if len(c) == 0 {
gobChunks = append(gobChunks, c)
continue
}
decoded, err := new(encoding.Frame).DeserializeGnark(c)
if err != nil {
return nil, fmt.Errorf("the chunks are in Gnark but cannot be decoded: %v", err)
}
encoded, err := decoded.Serialize()
if err != nil {
return nil, err
}
gobChunks = append(gobChunks, encoded)
}
chunks = gobChunks
}
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success", time.Since(start))
return &pb.RetrieveChunksReply{Chunks: chunks, Encoding: format}, nil
}
Expand Down
180 changes: 146 additions & 34 deletions node/grpc/server_test.go

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions node/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ func GetBlobMessages(in *pb.StoreChunksRequest, numWorkers int) ([]*core.BlobMes
for j, bundle := range blob.GetBundles() {
quorumID := blob.GetHeader().GetQuorumHeaders()[j].GetQuorumId()
if format == core.GnarkBundleEncodingFormat {
bundleMsg, err := new(core.Bundle).Deserialize(bundle.GetBundle())
if err != nil {
resultChan <- err
return
if len(bundle.GetBundle()) > 0 {
bundleMsg, err := new(core.Bundle).Deserialize(bundle.GetBundle())
if err != nil {
resultChan <- err
return
}
bundles[uint8(quorumID)] = bundleMsg
} else {
bundles[uint8(quorumID)] = make([]*encoding.Frame, 0)
}
bundles[uint8(quorumID)] = bundleMsg
} else if format == core.GobBundleEncodingFormat {
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(bundle.GetChunks()))
for k, data := range bundle.GetChunks() {
Expand Down

0 comments on commit 7f466d6

Please sign in to comment.