Skip to content

Commit

Permalink
Merge pull request #89 from Scalingo/feature/storage
Browse files Browse the repository at this point in the history
Add simple object storage package for s3 and swift
  • Loading branch information
john-scalingo authored Aug 16, 2019
2 parents f09d2d5 + e16d54f commit 08bc672
Show file tree
Hide file tree
Showing 10 changed files with 678 additions and 4 deletions.
54 changes: 51 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions mocks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"base_package": "github.com/Scalingo/go-utils",
"mocks": [
{
"interface": "Backend",
"src_package": "storage"
}, {
"interface": "Producer",
"src_package": "nsqproducer"
}
]
}
1 change: 1 addition & 0 deletions mocks_sig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"github.com/Scalingo/go-utils/nsqproducer.Producer":"33 c7 9b 68 1a 5f 11 fc 4f cb 66 83 92 27 61 b9 c4 ec 87 7b","github.com/Scalingo/go-utils/storage.Backend":"NOFILE"}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions storage/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package storage

import (
"context"
"io"
)

// BackendMethod represents the name of a Method included in the Backend interface
type BackendMethod string

const (
GetMethod BackendMethod = "Get"
UploadMethod BackendMethod = "Upload"
SizeMethod BackendMethod = "Size"
DeleteMethod BackendMethod = "Delete"
)

// Backend represents something which is able to store files on an object
// storage service
type Backend interface {
Get(ctx context.Context, path string) (io.ReadCloser, error)
Upload(ctx context.Context, file io.Reader, path string) error
Size(ctx context.Context, path string) (int64, error)
Delete(ctx context.Context, path string) error
}

var _ Backend = &S3{}
var _ Backend = &Swift{}
184 changes: 184 additions & 0 deletions storage/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package storage

import (
"context"
"io"
"time"

"github.com/Scalingo/go-utils/logger"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/awserr"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/aws/endpoints"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
"github.com/pkg/errors"
)

const (
NotFoundErrCode = "NotFound"
)

type S3Client interface {
GetObjectRequest(input *s3.GetObjectInput) s3.GetObjectRequest
HeadObjectRequest(input *s3.HeadObjectInput) s3.HeadObjectRequest
DeleteObjectRequest(input *s3.DeleteObjectInput) s3.DeleteObjectRequest
}

type S3Config struct {
AK string
SK string
Region string
Endpoint string
Bucket string
}

type RetryPolicy struct {
WaitDuration time.Duration
Attempts int
MethodHandlers map[BackendMethod][]string
}

type S3 struct {
cfg S3Config
s3client S3Client
s3uploader *s3manager.Uploader
retryPolicy RetryPolicy
}

type s3Opt func(s3 *S3)

// WithRetryPolicy is an option to constructor NewS3 to add a Retry Policy
// impacting GET operations
func WithRetryPolicy(policy RetryPolicy) s3Opt {
return s3Opt(func(s3 *S3) {
s3.retryPolicy = policy
})
}

func NewS3(cfg S3Config, opts ...s3Opt) *S3 {
s3config := s3Config(cfg)
s3 := &S3{
cfg: cfg, s3client: s3.New(s3config), s3uploader: s3manager.NewUploader(s3config),
retryPolicy: RetryPolicy{
WaitDuration: time.Second,
Attempts: 3,
MethodHandlers: map[BackendMethod][]string{
SizeMethod: []string{NotFoundErrCode},
},
},
}
for _, opt := range opts {
opt(s3)
}
return s3
}

func (s *S3) Get(ctx context.Context, path string) (io.ReadCloser, error) {
log := logger.Get(ctx)
log.WithField("path", path).Info("Get object")

input := &s3.GetObjectInput{
Bucket: &s.cfg.Bucket,
Key: &path,
}
out, err := s.s3client.GetObjectRequest(input).Send(ctx)
if err != nil {
return nil, errors.Wrapf(err, "fail to get object %v", path)
}
return out.Body, nil
}

func (s *S3) Upload(ctx context.Context, file io.Reader, path string) error {
input := &s3manager.UploadInput{
Body: file,
Bucket: &s.cfg.Bucket,
Key: &path,
}
_, err := s.s3uploader.UploadWithContext(ctx, input)
if err != nil {
return errors.Wrapf(err, "fail to save file to %v", path)
}

return nil
}

// Size returns the size of the content of the object. A retry mecanism is
// implemented because of the eventual consistency of S3 backends NotFound
// error are sometimes returned when the object was just uploaded.
func (s *S3) Size(ctx context.Context, path string) (int64, error) {
var res int64
err := s.retryWrapper(ctx, SizeMethod, func(ctx context.Context) error {
log := logger.Get(ctx).WithField("key", path)
log.Infof("[s3] Size()")

input := &s3.HeadObjectInput{Bucket: &s.cfg.Bucket, Key: &path}
stat, err := s.s3client.HeadObjectRequest(input).Send(ctx)
if err != nil {
return err
}
res = *stat.ContentLength
return nil
})

if err != nil {
return -1, errors.Wrapf(err, "fail to HEAD object '%v'", path)
}
return res, nil
}

func (s *S3) Delete(ctx context.Context, path string) error {
input := &s3.DeleteObjectInput{Bucket: &s.cfg.Bucket, Key: &path}
req := s.s3client.DeleteObjectRequest(input)
_, err := req.Send(ctx)
if err != nil {
return errors.Wrapf(err, "fail to delete object %v", path)
}

return nil
}

func (s *S3) retryWrapper(ctx context.Context, method BackendMethod, fun func(ctx context.Context) error) error {
var err error

errorCodes := s.retryPolicy.MethodHandlers[method]
// no-op is no retry policy on the method
if errorCodes == nil {
return fun(ctx)
}
for i := 0; i < s.retryPolicy.Attempts; i++ {
log := logger.Get(ctx).WithField("attempt", i+1)
ctx := logger.ToCtx(ctx, log)
err = fun(ctx)
if err == nil {
return nil
}
if aerr, ok := err.(awserr.Error); ok {
for _, code := range errorCodes {
if aerr.Code() == code {
time.Sleep(s.retryPolicy.WaitDuration)
}
}
}
}
return err
}

func s3Config(cfg S3Config) aws.Config {
credentials := aws.NewStaticCredentialsProvider(cfg.AK, cfg.SK, "")
config := aws.Config{
Region: cfg.Region,
Handlers: defaults.Handlers(),
HTTPClient: defaults.HTTPClient(),
Credentials: credentials,
EndpointResolver: aws.ResolveWithEndpoint(aws.Endpoint{
URL: "https://" + cfg.Endpoint,
SigningRegion: cfg.Endpoint,
}),
}
if cfg.Endpoint == "" {
config.EndpointResolver = endpoints.NewDefaultResolver()
}

return config
}
Loading

0 comments on commit 08bc672

Please sign in to comment.