Skip to content

Commit

Permalink
shift S3 code to s3.go file
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Sep 14, 2023
1 parent 514c72e commit 5504e28
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 80 deletions.
81 changes: 1 addition & 80 deletions io/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,12 @@
package io

import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"os"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/wolfeidau/s3iofs"
)

const (
S3Region = "s3.region"
S3SessionToken = "s3.session-token"
S3SecretAccessKey = "s3.secret-access-key"
S3AccessKeyID = "s3.access-key-id"
S3EndpointURL = "s3.endpoint"
S3ProxyURI = "s3.proxy-uri"
)

// IO is an interface to a hierarchical file system.
Expand Down Expand Up @@ -233,67 +214,7 @@ func inferFileIOFromSchema(path string, props map[string]string) (IO, error) {

switch parsed.Scheme {
case "s3", "s3a", "s3n":
opts := []func(*config.LoadOptions) error{}
endpoint, ok := props[S3EndpointURL]
if !ok {
endpoint = os.Getenv("AWS_S3_ENDPOINT")
}

if endpoint != "" {
opts = append(opts, config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if service != s3.ServiceID {
// fallback to default resolution for the service
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}

return aws.Endpoint{
URL: endpoint,
SigningRegion: region,
HostnameImmutable: true,
}, nil
})))
}

if defaultRegion, ok := props[S3Region]; ok {
opts = append(opts, config.WithDefaultRegion(defaultRegion))
}

accessKey, secretKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
}

awscfg, err := config.LoadDefaultConfig(context.Background(), opts...)
if err != nil {
return nil, err
}

preprocess := func(n string) string {
_, after, found := strings.Cut(n, "://")
if found {
n = after
}

return strings.TrimPrefix(n, parsed.Host)
}

s3fs := s3iofs.New(parsed.Host, awscfg)
return FSPreProcName(s3fs, preprocess), nil
return createS3FileIO(parsed, props)
case "file", "":
return LocalFS{}, nil
default:
Expand Down
108 changes: 108 additions & 0 deletions io/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package io

import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/wolfeidau/s3iofs"
)

// Constants for S3 configuration options
const (
S3Region = "s3.region"
S3SessionToken = "s3.session-token"
S3SecretAccessKey = "s3.secret-access-key"
S3AccessKeyID = "s3.access-key-id"
S3EndpointURL = "s3.endpoint"
S3ProxyURI = "s3.proxy-uri"
)

func createS3FileIO(parsed *url.URL, props map[string]string) (IO, error) {
opts := []func(*config.LoadOptions) error{}
endpoint, ok := props[S3EndpointURL]
if !ok {
endpoint = os.Getenv("AWS_S3_ENDPOINT")
}

if endpoint != "" {
opts = append(opts, config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
if service != s3.ServiceID {
// fallback to default resolution for the service
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
}

return aws.Endpoint{
URL: endpoint,
SigningRegion: region,
HostnameImmutable: true,
}, nil
})))
}

if defaultRegion, ok := props[S3Region]; ok {
opts = append(opts, config.WithDefaultRegion(defaultRegion))
}

accessKey, secretKey := props[S3AccessKeyID], props[S3SecretAccessKey]
token := props[S3SessionToken]
if accessKey != "" || secretKey != "" || token != "" {
opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken])))
}

if proxy, ok := props[S3ProxyURI]; ok {
proxyURL, err := url.Parse(proxy)
if err != nil {
return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy)
}

opts = append(opts, config.WithHTTPClient(awshttp.NewBuildableClient().WithTransportOptions(
func(t *http.Transport) {
t.Proxy = http.ProxyURL(proxyURL)
},
)))
}

awscfg, err := config.LoadDefaultConfig(context.Background(), opts...)
if err != nil {
return nil, err
}

preprocess := func(n string) string {
_, after, found := strings.Cut(n, "://")
if found {
n = after
}

return strings.TrimPrefix(n, parsed.Host)
}

s3fs := s3iofs.New(parsed.Host, awscfg)
return FSPreProcName(s3fs, preprocess), nil
}

0 comments on commit 5504e28

Please sign in to comment.