Skip to content

Commit

Permalink
Merge pull request #15 from base-org/gzip-s3
Browse files Browse the repository at this point in the history
Allow Storing Compressed Blobs
  • Loading branch information
danyalprout authored Mar 15, 2024
2 parents 428bb4c + 9c9974d commit 7129579
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
2 changes: 2 additions & 0 deletions common/flags/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type S3Config struct {
S3CredentialType S3CredentialType
AccessKey string
SecretAccessKey string
Compress bool
}

func (c S3Config) check() error {
Expand Down Expand Up @@ -104,6 +105,7 @@ func readS3Config(ctx *cli.Context) S3Config {
UseHttps: ctx.Bool(S3EndpointHttpsFlagName),
Bucket: ctx.String(S3BucketFlagName),
S3CredentialType: toS3CredentialType(ctx.String(S3CredentialTypeFlagName)),
Compress: ctx.Bool(S3CompressFlagName),
}
}

Expand Down
7 changes: 7 additions & 0 deletions common/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
S3CredentialTypeFlagName = "s3-credential-type"
S3EndpointFlagName = "s3-endpoint"
S3EndpointHttpsFlagName = "s3-endpoint-https"
S3CompressFlagName = "s3-compress"
S3AccessKeyFlagName = "s3-access-key"
S3SecretAccessKeyFlagName = "s3-secret-access-key"
S3BucketFlagName = "s3-bucket"
Expand Down Expand Up @@ -51,6 +52,12 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: true,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_ENDPOINT_HTTPS"),
},
&cli.BoolFlag{
Name: S3CompressFlagName,
Usage: "Whether to compress data before storing in S3",
Value: false,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_COMPRESS"),
},
&cli.StringFlag{
Name: S3AccessKeyFlagName,
Usage: "The S3 access key for the bucket",
Expand Down
63 changes: 52 additions & 11 deletions common/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package storage

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"

"github.com/base-org/blob-archiver/common/flags"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -13,9 +15,10 @@ import (
)

type S3Storage struct {
s3 *minio.Client
bucket string
log log.Logger
s3 *minio.Client
bucket string
log log.Logger
compress bool
}

func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
Expand All @@ -36,9 +39,10 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
}

return &S3Storage{
s3: client,
bucket: cfg.Bucket,
log: l,
s3: client,
bucket: cfg.Bucket,
log: l,
compress: cfg.Compress,
}, nil
}

Expand All @@ -63,7 +67,7 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error
return BlobData{}, ErrStorage
}
defer res.Close()
_, err = res.Stat()
stat, err := res.Stat()
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "NoSuchKey" {
Expand All @@ -75,8 +79,19 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error
}
}

var reader io.ReadCloser = res
defer reader.Close()

if stat.Metadata.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(reader)
if err != nil {
s.log.Warn("error creating gzip reader", "hash", hash.String(), "err", err)
return BlobData{}, ErrMarshaling
}
}

var data BlobData
err = json.NewDecoder(res).Decode(&data)
err = json.NewDecoder(reader).Decode(&data)
if err != nil {
s.log.Warn("error decoding blob", "hash", hash.String(), "err", err)
return BlobData{}, ErrMarshaling
Expand All @@ -92,10 +107,22 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
return ErrMarshaling
}

reader := bytes.NewReader(b)
_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), minio.PutObjectOptions{
options := minio.PutObjectOptions{
ContentType: "application/json",
})
}

if s.compress {
b, err = compress(b)
if err != nil {
s.log.Warn("error compressing blob", "err", err)
return ErrCompress
}
options.ContentEncoding = "gzip"
}

reader := bytes.NewReader(b)

_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), options)

if err != nil {
s.log.Warn("error writing blob", "err", err)
Expand All @@ -105,3 +132,17 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
s.log.Info("wrote blob", "hash", data.Header.BeaconBlockHash.String())
return nil
}

func compress(in []byte) ([]byte, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
_, err := gz.Write(in)
if err != nil {
return nil, err
}
err = gz.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
2 changes: 2 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
ErrStorage = errors.New("error accessing storage")
// ErrMarshaling is returned when there is an error in (un)marshaling the blob
ErrMarshaling = errors.New("error encoding/decoding blob")
// ErrCompress is returned when there is an error gzipping the data
ErrCompress = errors.New("error compressing blob")
)

type Header struct {
Expand Down
2 changes: 1 addition & 1 deletion validator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func Main() cliapp.LifecycleAction {
}

beaconClient := service.NewBlobSidecarClient(cfg.BeaconConfig.BeaconURL)
blobClient := service.NewBlobSidecarClient(cfg.BeaconConfig.BeaconURL)
blobClient := service.NewBlobSidecarClient(cfg.BlobConfig.BeaconURL)

return service.NewValidator(l, headerClient, beaconClient, blobClient, closeApp), nil
}
Expand Down

0 comments on commit 7129579

Please sign in to comment.