Skip to content

Commit

Permalink
Merge branch 'main' into fix/pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
elena-kolevska authored Nov 21, 2024
2 parents 7efaf44 + f521a76 commit 6dcd1e9
Show file tree
Hide file tree
Showing 18 changed files with 270 additions and 14 deletions.
5 changes: 4 additions & 1 deletion bindings/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,8 @@ func (d *DynamoDB) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
}

func (d *DynamoDB) Close() error {
return d.authProvider.Close()
if d.authProvider != nil {
return d.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ func (a *AWSKinesis) Close() error {
close(a.closeCh)
}
a.wg.Wait()
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}

func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*string, error) {
Expand Down
5 changes: 4 additions & 1 deletion bindings/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ func (s *AWSS3) Init(ctx context.Context, metadata bindings.Metadata) error {
}

func (s *AWSS3) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}

func (s *AWSS3) Operations() []bindings.OperationKind {
Expand Down
5 changes: 4 additions & 1 deletion bindings/aws/ses/ses.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,8 @@ func (a *AWSSES) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMa
}

func (a *AWSSES) Close() error {
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/sns/sns.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,8 @@ func (a *AWSSNS) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
}

func (a *AWSSNS) Close() error {
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion bindings/aws/sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (a *AWSSQS) Close() error {
close(a.closeCh)
}
a.wg.Wait()
return a.authProvider.Close()
if a.authProvider != nil {
return a.authProvider.Close()
}
return nil
}

func (a *AWSSQS) parseSQSMetadata(meta bindings.Metadata) (*sqsMetadata, error) {
Expand Down
5 changes: 5 additions & 0 deletions bindings/postgres/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package postgres

import (
"errors"
"time"

"github.com/dapr/components-contrib/common/authentication/aws"
Expand Down Expand Up @@ -53,5 +54,9 @@ func (m *psqlMetadata) InitWithMetadata(meta map[string]string) error {
return err
}

if m.Timeout < 1*time.Second {
return errors.New("invalid value for 'timeout': must be greater than 1s")
}

return nil
}
88 changes: 88 additions & 0 deletions bindings/postgres/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postgres

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestMetadata(t *testing.T) {
t.Run("missing connection string", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{}

err := m.InitWithMetadata(props)
require.Error(t, err)
require.ErrorContains(t, err, "connection string")
})

t.Run("has connection string", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
})

t.Run("default timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
assert.Equal(t, 20*time.Second, m.Timeout)
})

t.Run("invalid timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "NaN",
}

err := m.InitWithMetadata(props)
require.Error(t, err)
})

t.Run("positive timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "42",
}

err := m.InitWithMetadata(props)
require.NoError(t, err)
assert.Equal(t, 42*time.Second, m.Timeout)
})

t.Run("zero timeout", func(t *testing.T) {
m := psqlMetadata{}
props := map[string]string{
"connectionString": "foo",
"timeout": "0",
}

err := m.InitWithMetadata(props)
require.Error(t, err)
})
}
11 changes: 10 additions & 1 deletion bindings/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,20 @@ func (p *Postgres) Init(ctx context.Context, meta bindings.Metadata) error {

// This context doesn't control the lifetime of the connection pool, and is
// only scoped to postgres creating resources at init.
p.db, err = pgxpool.NewWithConfig(ctx, poolConfig)
connCtx, connCancel := context.WithTimeout(ctx, m.Timeout)
defer connCancel()
p.db, err = pgxpool.NewWithConfig(connCtx, poolConfig)
if err != nil {
return fmt.Errorf("unable to connect to the DB: %w", err)
}

pingCtx, pingCancel := context.WithTimeout(ctx, m.Timeout)
defer pingCancel()
err = p.db.Ping(pingCtx)
if err != nil {
return fmt.Errorf("failed to ping the DB: %w", err)
}

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions bindings/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package postgres

import (
"context"
"errors"
"fmt"
"os"
"testing"
Expand Down Expand Up @@ -62,6 +63,10 @@ func TestPostgresIntegration(t *testing.T) {
t.SkipNow()
}

t.Run("Test init configurations", func(t *testing.T) {
testInitConfiguration(t, url)
})

// live DB test
b := NewPostgres(logger.NewLogger("test")).(*Postgres)
m := bindings.Metadata{Base: metadata.Base{Properties: map[string]string{"connectionString": url}}}
Expand Down Expand Up @@ -131,6 +136,46 @@ func TestPostgresIntegration(t *testing.T) {
})
}

// testInitConfiguration tests valid and invalid config settings.
func testInitConfiguration(t *testing.T, connectionString string) {
logger := logger.NewLogger("test")
tests := []struct {
name string
props map[string]string
expectedErr error
}{
{
name: "Empty",
props: map[string]string{},
expectedErr: errors.New("missing connection string"),
},
{
name: "Valid connection string",
props: map[string]string{"connectionString": connectionString},
expectedErr: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPostgres(logger).(*Postgres)
defer p.Close()

metadata := bindings.Metadata{
Base: metadata.Base{Properties: tt.props},
}

err := p.Init(context.Background(), metadata)
if tt.expectedErr == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, tt.expectedErr, err)
}
})
}
}

func assertResponse(t *testing.T, res *bindings.InvokeResponse, err error) {
require.NoError(t, err)
assert.NotNil(t, res)
Expand Down
1 change: 1 addition & 0 deletions common/authentication/aws/x509.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func newX509(ctx context.Context, opts Options, cfg *aws.Config) (*x509, error)
return GetConfig(opts)
}(),
clients: newClients(),
closeCh: make(chan struct{}),
}

if err := auth.getCertPEM(ctx); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,10 @@ func (s *snsSqs) Close() error {
s.subscriptionManager.Close()
}

return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}

func (s *snsSqs) Features() []pubsub.Feature {
Expand Down
5 changes: 4 additions & 1 deletion secretstores/aws/parameterstore/parameterstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,5 +182,8 @@ func (s *ssmSecretStore) GetComponentMetadata() (metadataInfo metadata.MetadataM
}

func (s *ssmSecretStore) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion secretstores/aws/secretmanager/secretmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,8 @@ func (s *smSecretStore) GetComponentMetadata() (metadataInfo metadata.MetadataMa
}

func (s *smSecretStore) Close() error {
return s.authProvider.Close()
if s.authProvider != nil {
return s.authProvider.Close()
}
return nil
}
5 changes: 4 additions & 1 deletion state/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ func (d *StateStore) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
}

func (d *StateStore) Close() error {
return d.authProvider.Close()
if d.authProvider != nil {
return d.authProvider.Close()
}
return nil
}

func (d *StateStore) getDynamoDBMetadata(meta state.Metadata) (*dynamoDBMetadata, error) {
Expand Down
15 changes: 14 additions & 1 deletion state/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/dapr/components-contrib/contenttype"

"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/bsonrw"
Expand Down Expand Up @@ -528,7 +530,18 @@ func (m *MongoDB) doTransaction(sessCtx mongo.SessionContext, operations []state
var err error
switch req := o.(type) {
case state.SetRequest:
err = m.setInternal(sessCtx, &req)
{
isJSON := (len(req.Metadata) > 0 && req.Metadata[metadata.ContentType] == contenttype.JSONContentType)
if isJSON {
if bytes, ok := req.Value.([]byte); ok {
err = json.Unmarshal(bytes, &req.Value)
if err != nil {
break
}
}
}
err = m.setInternal(sessCtx, &req)
}
case state.DeleteRequest:
err = m.deleteInternal(sessCtx, &req)
}
Expand Down
4 changes: 2 additions & 2 deletions state/postgresql/v2/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,16 @@ func (p *PostgreSQL) Init(ctx context.Context, meta state.Metadata) (err error)
}

connCtx, connCancel := context.WithTimeout(ctx, p.metadata.Timeout)
defer connCancel()
p.db, err = pgxpool.NewWithConfig(connCtx, config)
connCancel()
if err != nil {
err = fmt.Errorf("failed to connect to the database: %w", err)
return err
}

pingCtx, pingCancel := context.WithTimeout(ctx, p.metadata.Timeout)
defer pingCancel()
err = p.db.Ping(pingCtx)
pingCancel()
if err != nil {
err = fmt.Errorf("failed to ping the database: %w", err)
return err
Expand Down
Loading

0 comments on commit 6dcd1e9

Please sign in to comment.