Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Storing Compressed Blobs #15

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/flags/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type S3Config struct {
S3CredentialType S3CredentialType
AccessKey string
SecretAccessKey string
Compress bool
}

func (c S3Config) check() error {
Expand Down Expand Up @@ -104,6 +105,7 @@ func readS3Config(ctx *cli.Context) S3Config {
UseHttps: ctx.Bool(S3EndpointHttpsFlagName),
Bucket: ctx.String(S3BucketFlagName),
S3CredentialType: toS3CredentialType(ctx.String(S3CredentialTypeFlagName)),
Compress: ctx.Bool(S3CompressFlagName),
}
}

Expand Down
7 changes: 7 additions & 0 deletions common/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
S3CredentialTypeFlagName = "s3-credential-type"
S3EndpointFlagName = "s3-endpoint"
S3EndpointHttpsFlagName = "s3-endpoint-https"
S3CompressFlagName = "s3-compress"
S3AccessKeyFlagName = "s3-access-key"
S3SecretAccessKeyFlagName = "s3-secret-access-key"
S3BucketFlagName = "s3-bucket"
Expand Down Expand Up @@ -51,6 +52,12 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: true,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_ENDPOINT_HTTPS"),
},
&cli.BoolFlag{
Name: S3CompressFlagName,
Usage: "Whether to compress data before storing in S3",
Value: false,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_COMPRESS"),
},
&cli.StringFlag{
Name: S3AccessKeyFlagName,
Usage: "The S3 access key for the bucket",
Expand Down
64 changes: 52 additions & 12 deletions common/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ package storage

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"

"github.com/base-org/blob-archiver/common/flags"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"io"
)

type S3Storage struct {
s3 *minio.Client
bucket string
log log.Logger
s3 *minio.Client
bucket string
log log.Logger
compress bool
}

func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
Expand All @@ -36,9 +38,10 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
}

return &S3Storage{
s3: client,
bucket: cfg.Bucket,
log: l,
s3: client,
bucket: cfg.Bucket,
log: l,
compress: cfg.Compress,
}, nil
}

Expand All @@ -63,7 +66,7 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error
return BlobData{}, ErrStorage
}
defer res.Close()
_, err = res.Stat()
stat, err := res.Stat()
if err != nil {
errResponse := minio.ToErrorResponse(err)
if errResponse.Code == "NoSuchKey" {
Expand All @@ -75,8 +78,19 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error
}
}

var reader io.ReadCloser = res
defer reader.Close()

if stat.Metadata.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(reader)
if err != nil {
s.log.Warn("error creating gzip reader", "hash", hash.String(), "err", err)
return BlobData{}, ErrMarshaling
}
}

var data BlobData
err = json.NewDecoder(res).Decode(&data)
err = json.NewDecoder(reader).Decode(&data)
if err != nil {
s.log.Warn("error decoding blob", "hash", hash.String(), "err", err)
return BlobData{}, ErrMarshaling
Expand All @@ -92,10 +106,22 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
return ErrMarshaling
}

reader := bytes.NewReader(b)
_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), minio.PutObjectOptions{
options := minio.PutObjectOptions{
ContentType: "application/json",
})
}

if s.compress {
b, err = compress(b)
if err != nil {
s.log.Warn("error compressing blob", "err", err)
return ErrCompress
}
options.ContentEncoding = "gzip"
}

reader := bytes.NewReader(b)

_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), options)

if err != nil {
s.log.Warn("error writing blob", "err", err)
Expand All @@ -105,3 +131,17 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
s.log.Info("wrote blob", "hash", data.Header.BeaconBlockHash.String())
return nil
}

func compress(in []byte) ([]byte, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
_, err := gz.Write(in)
if err != nil {
return nil, err
}
err = gz.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
2 changes: 2 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
ErrStorage = errors.New("error accessing storage")
// ErrMarshaling is returned when there is an error in (un)marshaling the blob
ErrMarshaling = errors.New("error encoding/decoding blob")
// ErrCompress is returned when there is an error gzipping the data
ErrCompress = errors.New("error compressing blob")
)

type Header struct {
Expand Down