Skip to content

Commit

Permalink
enhance: Support aliyun as oss source (#301)
Browse files Browse the repository at this point in the history
Aliyun OSS was banned for minio adaptation issue. This PR add it back
after verification.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 22, 2024
1 parent ed97cba commit 6692403
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/milvus-io/birdwatcher
go 1.18

require (
github.com/aliyun/credentials-go v1.3.6
github.com/aliyun/credentials-go v1.3.7
github.com/apache/arrow/go/v8 v8.0.0
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
github.com/blang/semver/v4 v4.0.0
Expand All @@ -25,7 +25,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.8.3
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982
go.etcd.io/etcd/api/v3 v3.5.4
go.etcd.io/etcd/client/v3 v3.5.4
go.etcd.io/etcd/server/v3 v3.5.4
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ github.com/alibabacloud-go/debug v1.0.0 h1:3eIEQWfay1fB24PQIEzXAswlVJtdQok8f3EVN
github.com/alibabacloud-go/debug v1.0.0/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc=
github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU=
github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk=
github.com/aliyun/credentials-go v1.3.6 h1:K5STbhaWjoj5Ht0juOj9mWE2lGelShHLzu5QR3cQ5X8=
github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/aliyun/credentials-go v1.3.7 h1:f1XaxzMlyxvcRtHBWF6W3bWHWa2q26xNDjSnujXWgfM=
github.com/aliyun/credentials-go v1.3.7/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM=
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down Expand Up @@ -802,8 +802,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 h1:LcUqBlKC4j15LhT303yQDX/XxyHG4haEQqbHgZZA4SY=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982 h1:gxat/4F9zSOQRT2Kr9XvoakNyeWWXoLDPpdQruWfA2I=
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0=
github.com/testcontainers/testcontainers-go v0.0.10/go.mod h1:2kePcwMHd3ix/BU3cTDuhvggUgMBAit+qcWwadeMXok=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
85 changes: 85 additions & 0 deletions oss/aliyun.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package oss

import (
"github.com/aliyun/credentials-go/credentials" // >= v1.2.6
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
minioCred "github.com/minio/minio-go/v7/pkg/credentials"
)

type Credential interface {
credentials.Credential
}

func processMinioAliyunOptions(p MinioClientParam, opts *minio.Options) error {
if p.UseIAM {
credProvider, err := NewAliyunCredentialProvider()
if err != nil {
return err
}
opts.Creds = minioCred.New(credProvider)
} else {
opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "")
}
opts.BucketLookup = minio.BucketLookupDNS
return nil
}

// CredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider
// also implements transport
type CredentialProvider struct {
// aliyunCreds doesn't provide a way to get the expire time, so we use the cache to check if it's expired
// when aliyunCreds.GetAccessKeyId is different from the cache, we know it's expired
akCache string
aliyunCreds Credential
}

func NewAliyunCredentialProvider() (minioCred.Provider, error) {
aliyunCreds, err := credentials.NewCredential(nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create aliyun credential")
}
// backend, err := minio.DefaultTransport(true)
// if err != nil {
// return nil, errors.Wrap(err, "failed to create default transport")
// }
// credentials.GetCredential()
return &CredentialProvider{aliyunCreds: aliyunCreds}, nil
}

// Retrieve returns nil if it successfully retrieved the value.
// Error is returned if the value were not obtainable, or empty.
// according to the caller minioCred.Credentials.Get(),
// it already has a lock, so we don't need to worry about concurrency
func (c *CredentialProvider) Retrieve() (minioCred.Value, error) {
ret := minioCred.Value{}
ak, err := c.aliyunCreds.GetAccessKeyId()
if err != nil {
return ret, errors.Wrap(err, "failed to get access key id from aliyun credential")
}
ret.AccessKeyID = *ak
sk, err := c.aliyunCreds.GetAccessKeySecret()
if err != nil {
return minioCred.Value{}, errors.Wrap(err, "failed to get access key secret from aliyun credential")
}
securityToken, err := c.aliyunCreds.GetSecurityToken()
if err != nil {
return minioCred.Value{}, errors.Wrap(err, "failed to get security token from aliyun credential")
}
ret.SecretAccessKey = *sk
c.akCache = *ak
ret.SessionToken = *securityToken
return ret, nil
}

// IsExpired returns if the credentials are no longer valid, and need
// to be retrieved.
// according to the caller minioCred.Credentials.IsExpired(),
// it already has a lock, so we don't need to worry about concurrency
func (c CredentialProvider) IsExpired() bool {
ak, err := c.aliyunCreds.GetAccessKeyId()
if err != nil {
return true
}
return *ak != c.akCache
}
26 changes: 24 additions & 2 deletions oss/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
"github.com/minio/minio-go/v7/pkg/credentials"
)

const (
CloudProviderGCP = "gcp"
CloudProviderAWS = "aws"
CloudProviderAliyun = "aliyun"
CloudProviderAzure = "azure"
CloudProviderTencent = "tencent"
)

type MinioClientParam struct {
Addr string
Port string
Expand Down Expand Up @@ -40,24 +48,38 @@ func NewMinioClient(ctx context.Context, p MinioClientParam) (*MinioClient, erro
endpoint := fmt.Sprintf("%s:%s", p.Addr, p.Port)

switch p.CloudProvider {
case "aws":
case CloudProviderAWS:
processMinioAwsOptions(p, opts)
case "gcp":
case CloudProviderGCP:
// adhoc to remove port of gcs address to let minio-go know it's gcs
if strings.Contains(endpoint, GcsDefaultAddress) {
endpoint = GcsDefaultAddress
}
processMinioGcpOptions(p, opts)
case CloudProviderAliyun:
processMinioAliyunOptions(p, opts)
case CloudProviderTencent:
// processMinioTencentOptions(p, opts)
// cos address issue WIP
fallthrough
case CloudProviderAzure:
// TODO support azure
fallthrough
default:
return nil, errors.Newf("Cloud provider %s not supported yet", p.CloudProvider)
}
fmt.Printf("Start to connect to oss endpoind: %s\n", endpoint)
client, err := minio.New(endpoint, opts)
if err != nil {
fmt.Println("new client failed: ", err.Error())
return nil, err
}

fmt.Println("Connection successful!")

ok, err := client.BucketExists(ctx, p.BucketName)
if err != nil {
fmt.Printf("check bucket %s exists failed: %s\n", p.BucketName, err.Error())
return nil, err
}
if !ok {
Expand Down
71 changes: 71 additions & 0 deletions oss/tencent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package oss

import (
"github.com/cockroachdb/errors"
"github.com/minio/minio-go/v7"
minioCred "github.com/minio/minio-go/v7/pkg/credentials"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
)

func processMinioTencentOptions(p MinioClientParam, opts *minio.Options) error {
if p.UseIAM {
credProvider, err := NewTencentCredentialProvider()
if err != nil {
return err
}
opts.Creds = minioCred.New(credProvider)
} else {
opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "")
}
opts.BucketLookup = minio.BucketLookupDNS
return nil
}

// TencentCredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider
// also implements transport
type TencentCredentialProvider struct {
// tencentCreds doesn't provide a way to get the expired time, so we use the cache to check if it's expired
// when tencentCreds.GetSecretId is different from the cache, we know it's expired
akCache string
tencentCreds common.CredentialIface
}

func NewTencentCredentialProvider() (minioCred.Provider, error) {
provider, err := common.DefaultTkeOIDCRoleArnProvider()
if err != nil {
return nil, errors.Wrap(err, "failed to create tencent credential provider")
}

cred, err := provider.GetCredential()
if err != nil {
return nil, errors.Wrap(err, "failed to get tencent credential")
}
return &TencentCredentialProvider{tencentCreds: cred}, nil
}

// Retrieve returns nil if it successfully retrieved the value.
// Error is returned if the value were not obtainable, or empty.
// according to the caller minioCred.Credentials.Get(),
// it already has a lock, so we don't need to worry about concurrency
func (c *TencentCredentialProvider) Retrieve() (minioCred.Value, error) {
ret := minioCred.Value{}
ak := c.tencentCreds.GetSecretId()
ret.AccessKeyID = ak
c.akCache = ak

sk := c.tencentCreds.GetSecretKey()
ret.SecretAccessKey = sk

securityToken := c.tencentCreds.GetToken()
ret.SessionToken = securityToken
return ret, nil
}

// IsExpired returns if the credentials are no longer valid, and need
// to be retrieved.
// according to the caller minioCred.Credentials.IsExpired(),
// it already has a lock, so we don't need to worry about concurrency
func (c TencentCredentialProvider) IsExpired() bool {
ak := c.tencentCreds.GetSecretId()
return ak != c.akCache
}
4 changes: 4 additions & 0 deletions states/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
var ak, sk string
var useIAM string
var useSSL string
var region string

for _, config := range configurations {
switch config.GetKey() {
Expand All @@ -81,6 +82,8 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str
addr = config.GetValue()
case "minio.port":
port = config.GetValue()
case "minio.region":
region = config.GetValue()
case "minio.bucketname":
bucketName = config.GetValue()
case "minio.rootpath":
Expand All @@ -101,6 +104,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str

mp := oss.MinioClientParam{
CloudProvider: cloudProvider,
Region: region,
Addr: addr,
Port: port,
AK: ak,
Expand Down

0 comments on commit 6692403

Please sign in to comment.