From df839de2763f4553e179fa28008819c6c2298559 Mon Sep 17 00:00:00 2001 From: Rowan Seymour Date: Mon, 29 Jul 2024 17:42:30 -0500 Subject: [PATCH] Remove no longer used storage package --- storage/base.go | 32 -------- storage/fs.go | 70 ---------------- storage/fs_test.go | 83 ------------------- storage/s3.go | 197 --------------------------------------------- storage/s3_test.go | 148 ---------------------------------- 5 files changed, 530 deletions(-) delete mode 100644 storage/base.go delete mode 100644 storage/fs.go delete mode 100644 storage/fs_test.go delete mode 100644 storage/s3.go delete mode 100644 storage/s3_test.go diff --git a/storage/base.go b/storage/base.go deleted file mode 100644 index ba66202..0000000 --- a/storage/base.go +++ /dev/null @@ -1,32 +0,0 @@ -package storage - -import "context" - -// Storage is an interface that provides storing and retrieval of file like things -type Storage interface { - // Name is the name of the storage implementation - Name() string - - // Test verifies this storage is functioning and returns an error if not - Test(ctx context.Context) error - - // Get retrieves the file from the given path - Get(ctx context.Context, path string) (string, []byte, error) - - // Put stores the given file at the given path - Put(ctx context.Context, path string, contentType string, body []byte) (string, error) - - // BatchPut stores the given uploads, returning the URLs of the files after upload - BatchPut(ctx context.Context, uploads []*Upload) error -} - -// Upload is our type for a file in a batch upload -type Upload struct { - Path string - ContentType string - Body []byte - - // set by BatchPut - URL string - Error error -} diff --git a/storage/fs.go b/storage/fs.go deleted file mode 100644 index f277455..0000000 --- a/storage/fs.go +++ /dev/null @@ -1,70 +0,0 @@ -package storage - -import ( - "context" - "fmt" - "os" - "path/filepath" - - "github.com/nyaruka/gocommon/uuids" -) - -type fsStorage struct { - directory string - perms os.FileMode -} - -// NewFS creates a new file system storage service suitable for use in tests -func NewFS(directory string, perms os.FileMode) Storage { - return &fsStorage{directory: directory, perms: perms} -} - -func (s *fsStorage) Name() string { - return "file system" -} - -func (s *fsStorage) Test(ctx context.Context) error { - // write randomly named file - path := fmt.Sprintf("%s.txt", uuids.New()) - fullPath, err := s.Put(ctx, path, "text/plain", []byte(`test`)) - if err != nil { - return err - } - - os.Remove(fullPath) - return nil -} - -func (s *fsStorage) Get(ctx context.Context, path string) (string, []byte, error) { - fullPath := filepath.Join(s.directory, path) - body, err := os.ReadFile(fullPath) - return "", body, err -} - -func (s *fsStorage) Put(ctx context.Context, path string, contentType string, body []byte) (string, error) { - fullPath := filepath.Join(s.directory, path) - - err := os.MkdirAll(filepath.Dir(fullPath), s.perms) - if err != nil { - return "", err - } - - err = os.WriteFile(fullPath, body, s.perms) - if err != nil { - return "", err - } - - return fullPath, nil -} - -func (s *fsStorage) BatchPut(ctx context.Context, us []*Upload) error { - for _, upload := range us { - url, err := s.Put(ctx, upload.Path, upload.ContentType, upload.Body) - if err != nil { - upload.Error = err - return err - } - upload.URL = url - } - return nil -} diff --git a/storage/fs_test.go b/storage/fs_test.go deleted file mode 100644 index a677d8a..0000000 --- a/storage/fs_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package storage_test - -import ( - "context" - "os" - "testing" - - "github.com/nyaruka/gocommon/storage" - "github.com/nyaruka/gocommon/uuids" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestFS(t *testing.T) { - ctx := context.Background() - uuids.SetGenerator(uuids.NewSeededGenerator(12345)) - defer uuids.SetGenerator(uuids.DefaultGenerator) - - s := storage.NewFS("_testing", 0766) - assert.NoError(t, s.Test(ctx)) - - // break our ability to write to that directory - require.NoError(t, os.Chmod("_testing", 0555)) - - assert.EqualError(t, s.Test(ctx), "open _testing/e7187099-7d38-4f60-955c-325957214c42.txt: permission denied") - - require.NoError(t, os.Chmod("_testing", 0777)) - - url, err := s.Put(ctx, "foo/bar.txt", "text/plain", []byte(`hello world`)) - assert.NoError(t, err) - assert.Equal(t, "_testing/foo/bar.txt", url) - - _, data, err := s.Get(ctx, "foo/bar.txt") - assert.NoError(t, err) - assert.Equal(t, []byte(`hello world`), data) - - require.NoError(t, os.RemoveAll("_testing")) - require.NoError(t, os.MkdirAll("_testing", 0777)) -} - -func TestFSBatchPut(t *testing.T) { - ctx := context.Background() - uuids.SetGenerator(uuids.NewSeededGenerator(12345)) - defer uuids.SetGenerator(uuids.DefaultGenerator) - - s := storage.NewFS("_testing", 0766) - - uploads := []*storage.Upload{ - { - Path: "https://mybucket.s3.amazonaws.com/foo/thing1", - Body: []byte(`HELLOWORLD`), - ContentType: "text/plain", - }, - { - Path: "https://mybucket.s3.amazonaws.com/foo/thing2", - Body: []byte(`HELLOWORLD2`), - ContentType: "text/plain", - }, - } - - // no writing to our test dir, will fail - require.NoError(t, os.Chmod("_testing", 0555)) - - err := s.BatchPut(ctx, uploads) - assert.Error(t, err) - - assert.Empty(t, uploads[0].URL) - assert.Empty(t, uploads[1].URL) - assert.NotEmpty(t, uploads[0].Error) - - // fix dir permissions, try again - require.NoError(t, os.Chmod("_testing", 0777)) - - err = s.BatchPut(ctx, uploads) - assert.NoError(t, err) - - assert.NotEmpty(t, uploads[0].URL) - assert.NotEmpty(t, uploads[1].URL) - - require.NoError(t, os.RemoveAll("_testing")) - require.NoError(t, os.MkdirAll("_testing", 0777)) -} diff --git a/storage/s3.go b/storage/s3.go deleted file mode 100644 index b5a1a04..0000000 --- a/storage/s3.go +++ /dev/null @@ -1,197 +0,0 @@ -package storage - -import ( - "bytes" - "context" - "fmt" - "io" - "sync" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" -) - -var s3BucketURL = "https://%s.s3.%s.amazonaws.com/%s" - -// S3Client provides a mockable subset of the S3 API -type S3Client interface { - HeadBucketWithContext(ctx context.Context, input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) - GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) - PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) -} - -// S3Options are options for an S3 client -type S3Options struct { - AWSAccessKeyID string - AWSSecretAccessKey string - Endpoint string - Region string - DisableSSL bool - ForcePathStyle bool - MaxRetries int -} - -// NewS3Client creates a new S3 client -func NewS3Client(opts *S3Options) (S3Client, error) { - config := &aws.Config{ - Endpoint: aws.String(opts.Endpoint), - Region: aws.String(opts.Region), - DisableSSL: aws.Bool(opts.DisableSSL), - S3ForcePathStyle: aws.Bool(opts.ForcePathStyle), - MaxRetries: aws.Int(opts.MaxRetries), - } - if opts.AWSAccessKeyID != "" && opts.AWSSecretAccessKey != "" { - config.Credentials = credentials.NewStaticCredentials(opts.AWSAccessKeyID, opts.AWSSecretAccessKey, "") - } - s3Session, err := session.NewSession(config) - if err != nil { - return nil, err - } - - return s3.New(s3Session), nil -} - -type s3Storage struct { - client S3Client - bucket string - region string - acl string - workersPerBatch int -} - -// NewS3 creates a new S3 storage service. Callers can specify how many parallel uploads will take place at -// once when calling BatchPut with workersPerBatch -func NewS3(client S3Client, bucket, region, acl string, workersPerBatch int) Storage { - return &s3Storage{client: client, bucket: bucket, region: region, acl: acl, workersPerBatch: workersPerBatch} -} - -func (s *s3Storage) Name() string { - return "S3" -} - -// Test tests whether our S3 client is properly configured -func (s *s3Storage) Test(ctx context.Context) error { - _, err := s.client.HeadBucketWithContext(ctx, &s3.HeadBucketInput{ - Bucket: aws.String(s.bucket), - }) - return err -} - -func (s *s3Storage) Get(ctx context.Context, path string) (string, []byte, error) { - out, err := s.client.GetObjectWithContext(ctx, &s3.GetObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(path), - }) - if err != nil { - return "", nil, fmt.Errorf("error getting S3 object: %w", err) - } - - body, err := io.ReadAll(out.Body) - if err != nil { - return "", nil, fmt.Errorf("error reading S3 object: %w", err) - } - - return aws.StringValue(out.ContentType), body, nil -} - -// Put writes the passed in file to the bucket with the passed in content type -func (s *s3Storage) Put(ctx context.Context, path string, contentType string, body []byte) (string, error) { - _, err := s.client.PutObjectWithContext(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), - Body: bytes.NewReader(body), - Key: aws.String(path), - ContentType: aws.String(contentType), - ACL: aws.String(s.acl), - }) - if err != nil { - return "", fmt.Errorf("error putting S3 object: %w", err) - } - - return s.url(path), nil -} - -func (s *s3Storage) batchWorker(ctx context.Context, uploads chan *Upload, errors chan error, stop chan bool, wg *sync.WaitGroup) { - defer wg.Done() - - for { - select { - case u := <-uploads: - var err error - for tries := 0; tries < 3; tries++ { - // we use a short timeout per request, better to retry than wait on a stalled connection and waste all our time - // TODO: validate choice of 15 seconds against real world performance - uctx, cancel := context.WithTimeout(ctx, time.Second*15) - defer cancel() - - _, err = s.client.PutObjectWithContext(uctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucket), - Body: bytes.NewReader(u.Body), - Key: aws.String(u.Path), - ContentType: aws.String(u.ContentType), - ACL: aws.String(s.acl), - }) - - if err == nil { - break - } - } - - if err == nil { - u.URL = s.url(u.Path) - } else { - u.Error = err - } - - errors <- err - - case <-stop: - return - } - } -} - -// BatchPut writes the entire batch of items to the passed in URLs, returning a map of errors if any. -// Writes will be retried up to three times automatically. -func (s *s3Storage) BatchPut(ctx context.Context, us []*Upload) error { - uploads := make(chan *Upload, len(us)) - errors := make(chan error, len(us)) - stop := make(chan bool) - wg := &sync.WaitGroup{} - - // start our workers - for w := 0; w < s.workersPerBatch; w++ { - wg.Add(1) - go s.batchWorker(ctx, uploads, errors, stop, wg) - } - - // add all our uploads to our work queue - for _, u := range us { - uploads <- u - } - - // read all our errors out, we'll stop everything if we encounter one - var err error - for i := 0; i < len(us); i++ { - e := <-errors - if e != nil { - err = e - break - } - } - - // stop everyone - close(stop) - - // wait for everything to finish up - wg.Wait() - - return err -} - -func (s *s3Storage) url(path string) string { - return fmt.Sprintf(s3BucketURL, s.bucket, s.region, path) -} diff --git a/storage/s3_test.go b/storage/s3_test.go deleted file mode 100644 index c4932c5..0000000 --- a/storage/s3_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package storage_test - -import ( - "bytes" - "context" - "errors" - "io" - "testing" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/nyaruka/gocommon/storage" - "github.com/stretchr/testify/assert" -) - -type testS3Client struct { - headBucketInputs []*s3.HeadBucketInput - getObjectInputs []*s3.GetObjectInput - putObjectInputs []*s3.PutObjectInput - - returnError error - headBucketReturnValue *s3.HeadBucketOutput - getObjectReturnValue *s3.GetObjectOutput - putObjectReturnValue *s3.PutObjectOutput -} - -func (c *testS3Client) HeadBucketWithContext(ctx context.Context, input *s3.HeadBucketInput, opts ...request.Option) (*s3.HeadBucketOutput, error) { - c.headBucketInputs = append(c.headBucketInputs, input) - - if c.returnError != nil { - return nil, c.returnError - } - return c.headBucketReturnValue, nil -} -func (c *testS3Client) GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) { - c.getObjectInputs = append(c.getObjectInputs, input) - - if c.returnError != nil { - return nil, c.returnError - } - return c.getObjectReturnValue, nil -} -func (c *testS3Client) PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error) { - c.putObjectInputs = append(c.putObjectInputs, input) - - if c.returnError != nil { - return nil, c.returnError - } - return c.putObjectReturnValue, nil -} - -func TestS3Test(t *testing.T) { - client := &testS3Client{} - s3 := storage.NewS3(client, "mybucket", "us-east-1", s3.BucketCannedACLPublicRead, 1) - - assert.NoError(t, s3.Test(context.Background())) - - client.returnError = errors.New("boom") - - assert.EqualError(t, s3.Test(context.Background()), "boom") -} - -func TestS3Get(t *testing.T) { - ctx := context.Background() - client := &testS3Client{} - s := storage.NewS3(client, "mybucket", "us-east-1", s3.BucketCannedACLPublicRead, 1) - - client.getObjectReturnValue = &s3.GetObjectOutput{ - ContentType: aws.String("text/plain"), - Body: io.NopCloser(bytes.NewReader([]byte(`HELLOWORLD`))), - } - - contentType, contents, err := s.Get(ctx, "foo/things") - assert.NoError(t, err) - assert.Equal(t, "text/plain", contentType) - assert.Equal(t, []byte(`HELLOWORLD`), contents) - - assert.Len(t, client.getObjectInputs, 1) - assert.Equal(t, aws.String("mybucket"), client.getObjectInputs[0].Bucket) - assert.Equal(t, aws.String("foo/things"), client.getObjectInputs[0].Key) - - client.returnError = errors.New("boom") - - _, _, err = s.Get(ctx, "foo/things") - assert.EqualError(t, err, "error getting S3 object: boom") -} - -func TestS3Put(t *testing.T) { - ctx := context.Background() - client := &testS3Client{} - s := storage.NewS3(client, "mybucket", "us-east-1", s3.BucketCannedACLPublicRead, 1) - - url, err := s.Put(ctx, "foo/things", "text/plain", []byte(`HELLOWORLD`)) - assert.NoError(t, err) - assert.Equal(t, "https://mybucket.s3.us-east-1.amazonaws.com/foo/things", url) - - assert.Len(t, client.putObjectInputs, 1) - assert.Equal(t, aws.String("mybucket"), client.putObjectInputs[0].Bucket) - assert.Equal(t, aws.String("foo/things"), client.putObjectInputs[0].Key) - assert.Equal(t, aws.String(s3.BucketCannedACLPublicRead), client.putObjectInputs[0].ACL) - - client.returnError = errors.New("boom") - - _, err = s.Put(ctx, "foo/things", "text/plain", []byte(`HELLOWORLD`)) - assert.EqualError(t, err, "error putting S3 object: boom") -} - -func TestS3BatchPut(t *testing.T) { - - ctx := context.Background() - client := &testS3Client{} - s := storage.NewS3(client, "mybucket", "us-east-1", s3.BucketCannedACLPrivate, 10) - - uploads := []*storage.Upload{ - { - Path: "https://mybucket.s3.us-east-1.amazonaws.com/foo/thing1", - Body: []byte(`HELLOWORLD`), - ContentType: "text/plain", - }, - { - Path: "https://mybucket.s3.us-east-1.amazonaws.com/foo/thing2", - Body: []byte(`HELLOWORLD2`), - ContentType: "text/plain", - }, - } - - err := s.BatchPut(ctx, uploads) - assert.NoError(t, err) - - assert.NotEmpty(t, uploads[0].URL) - assert.NotEmpty(t, uploads[1].URL) - - // try again, with a single thread and throwing an error - s = storage.NewS3(client, "mybucket", "us-east-1", s3.BucketCannedACLPrivate, 1) - client.returnError = errors.New("boom") - - uploads[0].URL = "" - uploads[1].URL = "" - - err = s.BatchPut(ctx, uploads) - - assert.Error(t, err) - - assert.Empty(t, uploads[0].URL) - assert.Empty(t, uploads[1].URL) - assert.NotEmpty(t, uploads[0].Error) -}