Skip to content

Commit

Permalink
Merge branch 'main' into feat-iam-roles-anywhere-kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 authored Nov 20, 2024
2 parents 64da4cb + e2b27d3 commit 5f56afd
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 14 deletions.
5 changes: 4 additions & 1 deletion bindings/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,8 @@ func (d *DynamoDB) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
}

func (d *DynamoDB) Close() error {
return d.authProvider.Close()
if d.authProvider != nil {
return d.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ func (a *AWSKinesis) Close() error {
close(a.closeCh)
}
a.wg.Wait()
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}

func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*string, error) {
Expand Down
5 changes: 4 additions & 1 deletion bindings/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
}

func (s *AWSS3) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}

func (s *AWSS3) Operations() []bindings.OperationKind {
Expand Down
5 changes: 4 additions & 1 deletion bindings/aws/ses/ses.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,8 @@ func (a *AWSSES) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMa
}

func (a *AWSSES) Close() error {
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/sns/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,8 @@ func (a *AWSSNS) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
}

func (a *AWSSNS) Close() error {
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (a *AWSSQS) Close() error {
close(a.closeCh)
}
a.wg.Wait()
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}

func (a *AWSSQS) parseSQSMetadata(meta bindings.Metadata) (*sqsMetadata, error) {
Expand Down
5 changes: 5 additions & 0 deletions bindings/postgres/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package postgres

import (
"errors"
"time"

"github.com/dapr/components-contrib/common/authentication/aws"
Expand Down Expand Up @@ -53,5 +54,9 @@ func (m *psqlMetadata) InitWithMetadata(meta map[string]string) error {
return err
}

if m.Timeout < 1*time.Second {
return errors.New("invalid value for 'timeout': must be greater than 1s")
}

return nil
}
88 changes: 88 additions & 0 deletions bindings/postgres/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postgres

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMetadata(t *testing.T) {
t.Run("missing connection string", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{}

err := m.InitWithMetadata(props)
require.Error(t, err)
require.ErrorContains(t, err, "connection string")
})

t.Run("has connection string", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
})

t.Run("default timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
assert.Equal(t, 20*time.Second, m.Timeout)
})

t.Run("invalid timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "NaN",
}

err := m.InitWithMetadata(props)
require.Error(t, err)
})

t.Run("positive timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "42",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
assert.Equal(t, 42*time.Second, m.Timeout)
})

t.Run("zero timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "0",
}

err := m.InitWithMetadata(props)
require.Error(t, err)
})
}
11 changes: 10 additions & 1 deletion bindings/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,20 @@ func (p *Postgres) Init(ctx context.Context, meta bindings.Metadata) error {

// This context doesn't control the lifetime of the connection pool, and is
// only scoped to postgres creating resources at init.
p.db, err = pgxpool.NewWithConfig(ctx, poolConfig)
connCtx, connCancel := context.WithTimeout(ctx, m.Timeout)
defer connCancel()
p.db, err = pgxpool.NewWithConfig(connCtx, poolConfig)
if err != nil {
return fmt.Errorf("unable to connect to the DB: %w", err)
}

pingCtx, pingCancel := context.WithTimeout(ctx, m.Timeout)
defer pingCancel()
err = p.db.Ping(pingCtx)
if err != nil {
return fmt.Errorf("failed to ping the DB: %w", err)
}

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions bindings/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package postgres

import (
"context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -62,6 +63,10 @@ func TestPostgresIntegration(t *testing.T) {
t.SkipNow()
}

t.Run("Test init configurations", func(t *testing.T) {
testInitConfiguration(t, url)
})

// live DB test
b := NewPostgres(logger.NewLogger("test")).(*Postgres)
m := bindings.Metadata{Base: metadata.Base{Properties: map[string]string{"connectionString": url}}}
Expand Down Expand Up @@ -131,6 +136,46 @@ func TestPostgresIntegration(t *testing.T) {
})
}

// testInitConfiguration tests valid and invalid config settings.
func testInitConfiguration(t *testing.T, connectionString string) {
logger := logger.NewLogger("test")
tests := []struct {
name string
props map[string]string
expectedErr error
}{
{
name: "Empty",
props: map[string]string{},
expectedErr: errors.New("missing connection string"),
},
{
name: "Valid connection string",
props: map[string]string{"connectionString": connectionString},
expectedErr: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPostgres(logger).(*Postgres)
defer p.Close()

metadata := bindings.Metadata{
Base: metadata.Base{Properties: tt.props},
}

err := p.Init(context.Background(), metadata)
if tt.expectedErr == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, tt.expectedErr, err)
}
})
}
}

func assertResponse(t *testing.T, res *bindings.InvokeResponse, err error) {
require.NoError(t, err)
assert.NotNil(t, res)
Expand Down
5 changes: 4 additions & 1 deletion pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,10 @@ func (s *snsSqs) Close() error {
s.subscriptionManager.Close()
}

return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}

func (s *snsSqs) Features() []pubsub.Feature {
Expand Down
5 changes: 4 additions & 1 deletion secretstores/aws/parameterstore/parameterstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,8 @@ func (s *ssmSecretStore) GetComponentMetadata() (metadataInfo metadata.MetadataM
}

func (s *ssmSecretStore) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion secretstores/aws/secretmanager/secretmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,8 @@ func (s *smSecretStore) GetComponentMetadata() (metadataInfo metadata.MetadataMa
}

func (s *smSecretStore) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion state/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ func (d *StateStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
}

func (d *StateStore) Close() error {
return d.authProvider.Close()
if d.authProvider != nil {
return d.authProvider.Close()
}
return nil
}

func (d *StateStore) getDynamoDBMetadata(meta state.Metadata) (*dynamoDBMetadata, error) {
Expand Down
15 changes: 14 additions & 1 deletion state/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/dapr/components-contrib/contenttype"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsonrw"
Expand Down Expand Up @@ -528,7 +530,18 @@ func (m *MongoDB) doTransaction(sessCtx mongo.SessionContext, operations []state
var err error
switch req := o.(type) {
case state.SetRequest:
err = m.setInternal(sessCtx, &req)
{
isJSON := (len(req.Metadata) > 0 && req.Metadata[metadata.ContentType] == contenttype.JSONContentType)
if isJSON {
if bytes, ok := req.Value.([]byte); ok {
err = json.Unmarshal(bytes, &req.Value)
if err != nil {
break
}
}
}
err = m.setInternal(sessCtx, &req)
}
case state.DeleteRequest:
err = m.deleteInternal(sessCtx, &req)
}
Expand Down
4 changes: 2 additions & 2 deletions state/postgresql/v2/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) (err error)
}

connCtx, connCancel := context.WithTimeout(ctx, p.metadata.Timeout)
defer connCancel()
p.db, err = pgxpool.NewWithConfig(connCtx, config)
connCancel()
if err != nil {
err = fmt.Errorf("failed to connect to the database: %w", err)
return err
}

pingCtx, pingCancel := context.WithTimeout(ctx, p.metadata.Timeout)
defer pingCancel()
err = p.db.Ping(pingCtx)
pingCancel()
if err != nil {
err = fmt.Errorf("failed to ping the database: %w", err)
return err
Expand Down
Loading

0 comments on commit 5f56afd

Please sign in to comment.