Skip to content

Commit

Permalink
Merge pull request #174 from foomo/fix/update-mongo-iterators
Browse files Browse the repository at this point in the history
fix: update mongo iterators
  • Loading branch information
franklinkim authored Feb 23, 2023
2 parents ad6df3c + e8d7c6c commit 43e2926
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
33 changes: 23 additions & 10 deletions persistence/mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,12 @@ func (c *Collection) Find(ctx context.Context, filter, results interface{}, opts
} else if err != nil {
return err
}
defer CloseCursor(ctx, cursor, &err)
err = cursor.All(ctx, results)
return err

if err = cursor.All(ctx, results); err != nil {
return err
}

return cursor.Err()
}

func (c *Collection) FindOne(ctx context.Context, filter, result interface{}, opts ...*options.FindOneOptions) error {
Expand All @@ -367,6 +370,7 @@ func (c *Collection) FindOne(ctx context.Context, filter, result interface{}, op
} else if res.Err() != nil {
return res.Err()
}

return res.Decode(result)
}

Expand All @@ -377,37 +381,46 @@ func (c *Collection) FindIterate(ctx context.Context, filter interface{}, handle
} else if err != nil {
return err
}
defer CloseCursor(ctx, cursor, &err)

defer CloseCursor(cursor)

for cursor.Next(ctx) {
if err := handler(cursor.Decode); err != nil {
return err
}
}
return err

return cursor.Err()
}

func (c *Collection) Aggregate(ctx context.Context, pipeline mongo.Pipeline, results interface{}, opts ...*options.AggregateOptions) error {
cursor, err := c.collection.Aggregate(ctx, pipeline, opts...)
if err != nil {
return err
}
defer CloseCursor(ctx, cursor, &err)
err = cursor.All(ctx, results)
return err

if err = cursor.All(ctx, results); err != nil {
return err
}

return cursor.Err()
}

func (c *Collection) AggregateIterate(ctx context.Context, pipeline mongo.Pipeline, handler IterateHandlerFn, opts ...*options.AggregateOptions) error {
cursor, err := c.collection.Aggregate(ctx, pipeline, opts...)
if err != nil {
return err
}
defer CloseCursor(ctx, cursor, &err)

defer CloseCursor(cursor)

for cursor.Next(ctx) {
if err := handler(cursor.Decode); err != nil {
return err
}
}
return nil

return cursor.Err()
}

// Count returns the count of documents
Expand Down
8 changes: 4 additions & 4 deletions persistence/mongo/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package keelmongo
import (
"context"

"github.com/foomo/keel/log"
"go.mongodb.org/mongo-driver/mongo"
)

// CloseCursor with defer
func CloseCursor(ctx context.Context, cursor *mongo.Cursor, err *error) {
cErr := cursor.Close(ctx)
if *err == nil {
*err = cErr
func CloseCursor(cursor *mongo.Cursor) {
if err := cursor.Close(context.Background()); err != nil {
log.WithError(nil, err).Error("failed to close cursor")
}
}

0 comments on commit 43e2926

Please sign in to comment.