Skip to content

Commit

Permalink
fix(s3): Move the service construction to s3 service
Browse files Browse the repository at this point in the history
Ref: 5840

Signed-off-by: James Lu <[email protected]>
  • Loading branch information
mantissahz committed Jun 3, 2024
1 parent 3bce6e6 commit 0ee398c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 31 deletions.
24 changes: 6 additions & 18 deletions s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/sirupsen/logrus"

"github.com/longhorn/backupstore"
"github.com/longhorn/backupstore/http"
)

var (
Expand All @@ -23,7 +22,7 @@ var (
type BackupStoreDriver struct {
destURL string
path string
service Service
service *service
}

const (
Expand All @@ -37,8 +36,6 @@ func init() {
}

func initFunc(destURL string) (backupstore.BackupStoreDriver, error) {
b := &BackupStoreDriver{}

u, err := url.Parse(destURL)
if err != nil {
return nil, err
Expand All @@ -48,26 +45,17 @@ func initFunc(destURL string) (backupstore.BackupStoreDriver, error) {
return nil, fmt.Errorf("BUG: Why dispatch %v to %v?", u.Scheme, KIND)
}

if u.User != nil {
b.service.Region = u.Host
b.service.Bucket = u.User.Username()
} else {
//We would depends on AWS_REGION environment variable
b.service.Bucket = u.Host
b := &BackupStoreDriver{}
b.service, err = newService(u)
if err != nil {
return nil, err
}

b.path = u.Path
if b.service.Bucket == "" || b.path == "" {
return nil, fmt.Errorf("invalid URL. Must be either s3://bucket@region/path/, or s3://bucket/path")
}

// add custom ca to http client that is used by s3 service
customCerts := getCustomCerts()
client, err := http.GetClientWithCustomCerts(customCerts)
if err != nil {
return nil, err
}
b.service.Client = client

//Leading '/' can cause mystery problems for s3
b.path = strings.TrimLeft(b.path, "/")

Expand Down
50 changes: 37 additions & 13 deletions s3/s3_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"os"
"time"

Expand All @@ -13,9 +14,11 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/pkg/errors"

bhttp "github.com/longhorn/backupstore/http"
)

type Service struct {
type service struct {
Region string
Bucket string
Client *http.Client
Expand All @@ -25,7 +28,28 @@ const (
VirtualHostedStyle = "VIRTUAL_HOSTED_STYLE"
)

func (s *Service) New() (*s3.S3, error) {
func newService(u *url.URL) (*service, error) {
s := service{}
if u.User != nil {
s.Region = u.Host
s.Bucket = u.User.Username()
} else {
//We would depends on AWS_REGION environment variable
s.Bucket = u.Host
}

// add custom ca to http client that is used by s3 service
customCerts := getCustomCerts()
client, err := bhttp.GetClientWithCustomCerts(customCerts)
if err != nil {
return nil, err
}
s.Client = client

return &s, nil
}

func (s *service) newInstance() (*s3.S3, error) {
// get custom endpoint
endpoints := os.Getenv("AWS_ENDPOINTS")
config := &aws.Config{Region: &s.Region, MaxRetries: aws.Int(3)}
Expand Down Expand Up @@ -58,7 +82,7 @@ func (s *Service) New() (*s3.S3, error) {
return s3.New(ses), nil
}

func (s *Service) Close() {
func (s *service) Close() {
}

func parseAwsError(err error) error {
Expand All @@ -72,8 +96,8 @@ func parseAwsError(err error) error {
return err
}

func (s *Service) ListObjects(key, delimiter string) ([]*s3.Object, []*s3.CommonPrefix, error) {
svc, err := s.New()
func (s *service) ListObjects(key, delimiter string) ([]*s3.Object, []*s3.CommonPrefix, error) {
svc, err := s.newInstance()
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -102,8 +126,8 @@ func (s *Service) ListObjects(key, delimiter string) ([]*s3.Object, []*s3.Common
return objects, commonPrefixs, nil
}

func (s *Service) HeadObject(key string) (*s3.HeadObjectOutput, error) {
svc, err := s.New()
func (s *service) HeadObject(key string) (*s3.HeadObjectOutput, error) {
svc, err := s.newInstance()
if err != nil {
return nil, err
}
Expand All @@ -120,8 +144,8 @@ func (s *Service) HeadObject(key string) (*s3.HeadObjectOutput, error) {
return resp, nil
}

func (s *Service) PutObject(key string, reader io.ReadSeeker) error {
svc, err := s.New()
func (s *service) PutObject(key string, reader io.ReadSeeker) error {
svc, err := s.newInstance()
if err != nil {
return err
}
Expand Down Expand Up @@ -149,8 +173,8 @@ func (s *Service) PutObject(key string, reader io.ReadSeeker) error {
return nil
}

func (s *Service) GetObject(key string) (io.ReadCloser, error) {
svc, err := s.New()
func (s *service) GetObject(key string) (io.ReadCloser, error) {
svc, err := s.newInstance()
if err != nil {
return nil, err
}
Expand All @@ -170,14 +194,14 @@ func (s *Service) GetObject(key string) (io.ReadCloser, error) {
return resp.Body, nil
}

func (s *Service) DeleteObjects(key string) error {
func (s *service) DeleteObjects(key string) error {

objects, _, err := s.ListObjects(key, "")
if err != nil {
return errors.Wrapf(err, "failed to list objects with prefix %v before removing them", key)
}

svc, err := s.New()
svc, err := s.newInstance()
if err != nil {
return errors.Wrap(err, "failed to get a new s3 client instance before removing objects")
}
Expand Down

0 comments on commit 0ee398c

Please sign in to comment.