Skip to content

Commit

Permalink
gzip files stored in s3
Browse files Browse the repository at this point in the history
  • Loading branch information
danyalprout committed Mar 14, 2024
1 parent 90de93d commit f13a195
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
2 changes: 2 additions & 0 deletions common/flags/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type S3Config struct {
S3CredentialType S3CredentialType
AccessKey string
SecretAccessKey string
Compress bool
}

func (c S3Config) check() error {
Expand Down Expand Up @@ -104,6 +105,7 @@ func readS3Config(ctx *cli.Context) S3Config {
UseHttps: ctx.Bool(S3EndpointHttpsFlagName),
Bucket: ctx.String(S3BucketFlagName),
S3CredentialType: toS3CredentialType(ctx.String(S3CredentialTypeFlagName)),
Compress: ctx.Bool(S3CompressFlagName),
}
}

Expand Down
7 changes: 7 additions & 0 deletions common/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
S3CredentialTypeFlagName = "s3-credential-type"
S3EndpointFlagName = "s3-endpoint"
S3EndpointHttpsFlagName = "s3-endpoint-https"
S3CompressFlagName = "s3-compress"
S3AccessKeyFlagName = "s3-access-key"
S3SecretAccessKeyFlagName = "s3-secret-access-key"
S3BucketFlagName = "s3-bucket"
Expand Down Expand Up @@ -51,6 +52,12 @@ func CLIFlags(envPrefix string) []cli.Flag {
Value: true,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_ENDPOINT_HTTPS"),
},
&cli.BoolFlag{
Name: S3CompressFlagName,
Usage: "Whether to compress data before storing in S3",
Value: false,
EnvVars: opservice.PrefixEnvVar(envPrefix, "S3_COMPRESS"),
},
&cli.StringFlag{
Name: S3AccessKeyFlagName,
Usage: "The S3 access key for the bucket",
Expand Down
50 changes: 40 additions & 10 deletions common/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package storage

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"

"github.com/base-org/blob-archiver/common/flags"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -13,9 +13,10 @@ import (
)

type S3Storage struct {
s3 *minio.Client
bucket string
log log.Logger
s3 *minio.Client
bucket string
log log.Logger
compress bool
}

func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
Expand All @@ -36,9 +37,10 @@ func NewS3Storage(cfg flags.S3Config, l log.Logger) (*S3Storage, error) {
}

return &S3Storage{
s3: client,
bucket: cfg.Bucket,
log: l,
s3: client,
bucket: cfg.Bucket,
log: l,
compress: cfg.Compress,
}, nil
}

Expand Down Expand Up @@ -75,6 +77,8 @@ func (s *S3Storage) Read(ctx context.Context, hash common.Hash) (BlobData, error
}
}

// TODO: We may need to decode if it's gzipped

var data BlobData
err = json.NewDecoder(res).Decode(&data)
if err != nil {
Expand All @@ -92,10 +96,22 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
return ErrMarshaling
}

reader := bytes.NewReader(b)
_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), minio.PutObjectOptions{
options := minio.PutObjectOptions{
ContentType: "application/json",
})
}

if s.compress {
b, err = compress(b)
if err != nil {
s.log.Warn("error compressing blob", "err", err)
return ErrCompress
}
options.ContentEncoding = "gzip"
}

reader := bytes.NewReader(b)

_, err = s.s3.PutObject(ctx, s.bucket, data.Header.BeaconBlockHash.String(), reader, int64(len(b)), options)

if err != nil {
s.log.Warn("error writing blob", "err", err)
Expand All @@ -105,3 +121,17 @@ func (s *S3Storage) Write(ctx context.Context, data BlobData) error {
s.log.Info("wrote blob", "hash", data.Header.BeaconBlockHash.String())
return nil
}

func compress(in []byte) ([]byte, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
_, err := gz.Write(in)
if err != nil {
return nil, err
}
err = gz.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
2 changes: 2 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var (
ErrStorage = errors.New("error accessing storage")
// ErrMarshaling is returned when there is an error in (un)marshaling the blob
ErrMarshaling = errors.New("error encoding/decoding blob")
// ErrCompress is returned when there is an error gzipping the data
ErrCompress = errors.New("error compressing blob")
)

type Header struct {
Expand Down

0 comments on commit f13a195

Please sign in to comment.