From 66924033edb9eba25cdc211a7a6d642fa3ba7f8d Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 22 Aug 2024 18:10:50 +0800 Subject: [PATCH] enhance: Support aliyun as oss source (#301) Aliyun OSS was banned for minio adaptation issue. This PR add it back after verification. --------- Signed-off-by: Congqi Xia --- go.mod | 4 +-- go.sum | 8 ++--- oss/aliyun.go | 85 +++++++++++++++++++++++++++++++++++++++++++++++++ oss/minio.go | 26 +++++++++++++-- oss/tencent.go | 71 +++++++++++++++++++++++++++++++++++++++++ states/minio.go | 4 +++ 6 files changed, 190 insertions(+), 8 deletions(-) create mode 100644 oss/aliyun.go create mode 100644 oss/tencent.go diff --git a/go.mod b/go.mod index d612c99b..4786a8ae 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/milvus-io/birdwatcher go 1.18 require ( - github.com/aliyun/credentials-go v1.3.6 + github.com/aliyun/credentials-go v1.3.7 github.com/apache/arrow/go/v8 v8.0.0 github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 github.com/blang/semver/v4 v4.0.0 @@ -25,7 +25,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/streamnative/pulsarctl v0.5.0 github.com/stretchr/testify v1.8.3 - github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 + github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982 go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4 go.etcd.io/etcd/server/v3 v3.5.4 diff --git a/go.sum b/go.sum index 13ede025..a93b1d28 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,8 @@ github.com/alibabacloud-go/debug v1.0.0 h1:3eIEQWfay1fB24PQIEzXAswlVJtdQok8f3EVN github.com/alibabacloud-go/debug v1.0.0/go.mod h1:8gfgZCCAC3+SCzjWtY053FrOcd4/qlH6IHTI4QyICOc= github.com/alibabacloud-go/tea v1.2.2 h1:aTsR6Rl3ANWPfqeQugPglfurloyBJY85eFy7Gc1+8oU= github.com/alibabacloud-go/tea v1.2.2/go.mod h1:CF3vOzEMAG+bR4WOql8gc2G9H3EkH3ZLAQdpmpXMgwk= -github.com/aliyun/credentials-go v1.3.6 h1:K5STbhaWjoj5Ht0juOj9mWE2lGelShHLzu5QR3cQ5X8= -github.com/aliyun/credentials-go v1.3.6/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM= +github.com/aliyun/credentials-go v1.3.7 h1:f1XaxzMlyxvcRtHBWF6W3bWHWa2q26xNDjSnujXWgfM= +github.com/aliyun/credentials-go v1.3.7/go.mod h1:1LxUuX7L5YrZUWzBrRyk0SwSdH4OmPrib8NVePL3fxM= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -802,8 +802,8 @@ github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= -github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 h1:LcUqBlKC4j15LhT303yQDX/XxyHG4haEQqbHgZZA4SY= -github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0= +github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982 h1:gxat/4F9zSOQRT2Kr9XvoakNyeWWXoLDPpdQruWfA2I= +github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.982/go.mod h1:r5r4xbfxSaeR04b166HGsBa/R4U3SueirEUpXGuw+Q0= github.com/testcontainers/testcontainers-go v0.0.10/go.mod h1:2kePcwMHd3ix/BU3cTDuhvggUgMBAit+qcWwadeMXok= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/oss/aliyun.go b/oss/aliyun.go new file mode 100644 index 00000000..e739ab06 --- /dev/null +++ b/oss/aliyun.go @@ -0,0 +1,85 @@ +package oss + +import ( + "github.com/aliyun/credentials-go/credentials" // >= v1.2.6 + "github.com/cockroachdb/errors" + "github.com/minio/minio-go/v7" + minioCred "github.com/minio/minio-go/v7/pkg/credentials" +) + +type Credential interface { + credentials.Credential +} + +func processMinioAliyunOptions(p MinioClientParam, opts *minio.Options) error { + if p.UseIAM { + credProvider, err := NewAliyunCredentialProvider() + if err != nil { + return err + } + opts.Creds = minioCred.New(credProvider) + } else { + opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "") + } + opts.BucketLookup = minio.BucketLookupDNS + return nil +} + +// CredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider +// also implements transport +type CredentialProvider struct { + // aliyunCreds doesn't provide a way to get the expire time, so we use the cache to check if it's expired + // when aliyunCreds.GetAccessKeyId is different from the cache, we know it's expired + akCache string + aliyunCreds Credential +} + +func NewAliyunCredentialProvider() (minioCred.Provider, error) { + aliyunCreds, err := credentials.NewCredential(nil) + if err != nil { + return nil, errors.Wrap(err, "failed to create aliyun credential") + } + // backend, err := minio.DefaultTransport(true) + // if err != nil { + // return nil, errors.Wrap(err, "failed to create default transport") + // } + // credentials.GetCredential() + return &CredentialProvider{aliyunCreds: aliyunCreds}, nil +} + +// Retrieve returns nil if it successfully retrieved the value. +// Error is returned if the value were not obtainable, or empty. +// according to the caller minioCred.Credentials.Get(), +// it already has a lock, so we don't need to worry about concurrency +func (c *CredentialProvider) Retrieve() (minioCred.Value, error) { + ret := minioCred.Value{} + ak, err := c.aliyunCreds.GetAccessKeyId() + if err != nil { + return ret, errors.Wrap(err, "failed to get access key id from aliyun credential") + } + ret.AccessKeyID = *ak + sk, err := c.aliyunCreds.GetAccessKeySecret() + if err != nil { + return minioCred.Value{}, errors.Wrap(err, "failed to get access key secret from aliyun credential") + } + securityToken, err := c.aliyunCreds.GetSecurityToken() + if err != nil { + return minioCred.Value{}, errors.Wrap(err, "failed to get security token from aliyun credential") + } + ret.SecretAccessKey = *sk + c.akCache = *ak + ret.SessionToken = *securityToken + return ret, nil +} + +// IsExpired returns if the credentials are no longer valid, and need +// to be retrieved. +// according to the caller minioCred.Credentials.IsExpired(), +// it already has a lock, so we don't need to worry about concurrency +func (c CredentialProvider) IsExpired() bool { + ak, err := c.aliyunCreds.GetAccessKeyId() + if err != nil { + return true + } + return *ak != c.akCache +} diff --git a/oss/minio.go b/oss/minio.go index ef73467f..fd3e8f6c 100644 --- a/oss/minio.go +++ b/oss/minio.go @@ -10,6 +10,14 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" ) +const ( + CloudProviderGCP = "gcp" + CloudProviderAWS = "aws" + CloudProviderAliyun = "aliyun" + CloudProviderAzure = "azure" + CloudProviderTencent = "tencent" +) + type MinioClientParam struct { Addr string Port string @@ -40,24 +48,38 @@ func NewMinioClient(ctx context.Context, p MinioClientParam) (*MinioClient, erro endpoint := fmt.Sprintf("%s:%s", p.Addr, p.Port) switch p.CloudProvider { - case "aws": + case CloudProviderAWS: processMinioAwsOptions(p, opts) - case "gcp": + case CloudProviderGCP: // adhoc to remove port of gcs address to let minio-go know it's gcs if strings.Contains(endpoint, GcsDefaultAddress) { endpoint = GcsDefaultAddress } processMinioGcpOptions(p, opts) + case CloudProviderAliyun: + processMinioAliyunOptions(p, opts) + case CloudProviderTencent: + // processMinioTencentOptions(p, opts) + // cos address issue WIP + fallthrough + case CloudProviderAzure: + // TODO support azure + fallthrough default: return nil, errors.Newf("Cloud provider %s not supported yet", p.CloudProvider) } + fmt.Printf("Start to connect to oss endpoind: %s\n", endpoint) client, err := minio.New(endpoint, opts) if err != nil { + fmt.Println("new client failed: ", err.Error()) return nil, err } + fmt.Println("Connection successful!") + ok, err := client.BucketExists(ctx, p.BucketName) if err != nil { + fmt.Printf("check bucket %s exists failed: %s\n", p.BucketName, err.Error()) return nil, err } if !ok { diff --git a/oss/tencent.go b/oss/tencent.go new file mode 100644 index 00000000..1b76ed51 --- /dev/null +++ b/oss/tencent.go @@ -0,0 +1,71 @@ +package oss + +import ( + "github.com/cockroachdb/errors" + "github.com/minio/minio-go/v7" + minioCred "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" +) + +func processMinioTencentOptions(p MinioClientParam, opts *minio.Options) error { + if p.UseIAM { + credProvider, err := NewTencentCredentialProvider() + if err != nil { + return err + } + opts.Creds = minioCred.New(credProvider) + } else { + opts.Creds = minioCred.NewStaticV4(p.AK, p.SK, "") + } + opts.BucketLookup = minio.BucketLookupDNS + return nil +} + +// TencentCredentialProvider implements "github.com/minio/minio-go/v7/pkg/credentials".Provider +// also implements transport +type TencentCredentialProvider struct { + // tencentCreds doesn't provide a way to get the expired time, so we use the cache to check if it's expired + // when tencentCreds.GetSecretId is different from the cache, we know it's expired + akCache string + tencentCreds common.CredentialIface +} + +func NewTencentCredentialProvider() (minioCred.Provider, error) { + provider, err := common.DefaultTkeOIDCRoleArnProvider() + if err != nil { + return nil, errors.Wrap(err, "failed to create tencent credential provider") + } + + cred, err := provider.GetCredential() + if err != nil { + return nil, errors.Wrap(err, "failed to get tencent credential") + } + return &TencentCredentialProvider{tencentCreds: cred}, nil +} + +// Retrieve returns nil if it successfully retrieved the value. +// Error is returned if the value were not obtainable, or empty. +// according to the caller minioCred.Credentials.Get(), +// it already has a lock, so we don't need to worry about concurrency +func (c *TencentCredentialProvider) Retrieve() (minioCred.Value, error) { + ret := minioCred.Value{} + ak := c.tencentCreds.GetSecretId() + ret.AccessKeyID = ak + c.akCache = ak + + sk := c.tencentCreds.GetSecretKey() + ret.SecretAccessKey = sk + + securityToken := c.tencentCreds.GetToken() + ret.SessionToken = securityToken + return ret, nil +} + +// IsExpired returns if the credentials are no longer valid, and need +// to be retrieved. +// according to the caller minioCred.Credentials.IsExpired(), +// it already has a lock, so we don't need to worry about concurrency +func (c TencentCredentialProvider) IsExpired() bool { + ak := c.tencentCreds.GetSecretId() + return ak != c.akCache +} diff --git a/states/minio.go b/states/minio.go index 4761855f..bc46c238 100644 --- a/states/minio.go +++ b/states/minio.go @@ -72,6 +72,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str var ak, sk string var useIAM string var useSSL string + var region string for _, config := range configurations { switch config.GetKey() { @@ -81,6 +82,8 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str addr = config.GetValue() case "minio.port": port = config.GetValue() + case "minio.region": + region = config.GetValue() case "minio.bucketname": bucketName = config.GetValue() case "minio.rootpath": @@ -101,6 +104,7 @@ func (s *InstanceState) GetMinioClientFromCfg(ctx context.Context, minioAddr str mp := oss.MinioClientParam{ CloudProvider: cloudProvider, + Region: region, Addr: addr, Port: port, AK: ak,