Skip to content

Commit

Permalink
MongoDB snapshots (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 authored Feb 14, 2024
1 parent 3ea8e78 commit 3e6a5e0
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 15 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ Artie Reader reads from databases to perform historical snapshots and also reads
| | Snapshot | Streaming |
|------------|----------|-----------|
| DynamoDB |||
| PostgreSQL |||
| MongoDB |||
| MySQL | 🚧 ||
| MongoDB | 🚧 ||
| PostgreSQL |||


## Running

Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ type Metrics struct {
type Source string

const (
SourceInvalid Source = "invalid"
SourceDynamo Source = "dynamodb"
SourceMongoDB Source = "mongodb"
SourcePostgreSQL Source = "postgresql"
)

type Settings struct {
Source Source `yaml:"source"`
DynamoDB *DynamoDB `yaml:"dynamodb,omitempty"`
MongoDB *MongoDB `yaml:"mongodb,omitempty"`
PostgreSQL *PostgreSQL `yaml:"postgresql,omitempty"`

Reporting *Reporting `yaml:"reporting"`
Expand Down Expand Up @@ -99,6 +102,14 @@ func (s *Settings) Validate() error {
if err := s.DynamoDB.Validate(); err != nil {
return fmt.Errorf("dynamodb validation failed: %w", err)
}
case SourceMongoDB:
if s.MongoDB == nil {
return fmt.Errorf("mongodb config is nil")
}

if err := s.MongoDB.Validate(); err != nil {
return fmt.Errorf("mongodb validation failed: %w", err)
}
case SourcePostgreSQL:
if s.PostgreSQL == nil {
return fmt.Errorf("postgres config is nil")
Expand All @@ -107,6 +118,7 @@ func (s *Settings) Validate() error {
if err := s.PostgreSQL.Validate(); err != nil {
return fmt.Errorf("postgres validation failed: %w", err)
}

}

return nil
Expand Down
52 changes: 52 additions & 0 deletions config/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package config

import (
"fmt"
"github.com/artie-labs/reader/constants"
"github.com/artie-labs/transfer/lib/stringutil"
)

type MongoDB struct {
Host string `yaml:"host"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Database string `yaml:"database"`

Collections []Collection `yaml:"collections"`
}

type Collection struct {
Name string `yaml:"name"`
BatchSize int32 `yaml:"batchSize,omitempty"`
// TODO: In the future, we should be able to support customers passing Start/End PK values.
}

func (c Collection) TopicSuffix(db string) string {
return fmt.Sprintf("%s.%s", db, c.Name)
}

func (c Collection) GetBatchSize() int32 {
if c.BatchSize == 0 {
return constants.DefaultBatchSize
}

return c.BatchSize
}

func (m MongoDB) Validate() error {
if stringutil.Empty(m.Host, m.Database, m.Username, m.Password) {
return fmt.Errorf("one of the MongoDB settings is empty: host, username, password, database")
}

if len(m.Collections) == 0 {
return fmt.Errorf("no collections passed in")
}

for _, collection := range m.Collections {
if collection.Name == "" {
return fmt.Errorf("collection name must be passed in")
}
}

return nil
}
13 changes: 13 additions & 0 deletions examples/mongodb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source: mongodb
mongodb:
host: mongodb://localhost:27017
username: foo
password: bar
database: myFirstDatabase
collections:
- name: customers
batchSize: 1
kafka:
bootstrapServers: localhost:29092
topicPrefix: dbserver1
maxRequestSize: 4194304
9 changes: 8 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.21

require (
github.com/DataDog/datadog-go v4.8.3+incompatible
github.com/artie-labs/transfer v1.22.7
github.com/artie-labs/transfer v1.22.8
github.com/aws/aws-sdk-go v1.44.327
github.com/aws/aws-sdk-go-v2/config v1.18.19
github.com/getsentry/sentry-go v0.26.0
Expand All @@ -17,6 +17,7 @@ require (
github.com/segmentio/kafka-go v0.4.39
github.com/segmentio/kafka-go/sasl/aws_msk_iam_v2 v0.1.0
github.com/stretchr/testify v1.8.4
go.mongodb.org/mongo-driver v1.13.1
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -34,6 +35,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.18.7 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
Expand All @@ -42,11 +44,16 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/samber/slog-common v0.15.0 // indirect
github.com/twpayne/go-geom v1.5.3 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect
golang.org/x/mod v0.14.0 // indirect
Expand Down
24 changes: 22 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ github.com/alecthomas/assert/v2 v2.4.0 h1:/ZiZ0NnriAWPYYO+4eOjgzNELrFQLaHNr92mHS
github.com/alecthomas/assert/v2 v2.4.0/go.mod h1:fw5suVxB+wfYJ3291t0hRTqtGzFYdSwstnRQdaQx2DM=
github.com/alecthomas/repr v0.3.0 h1:NeYzUPfjjlqHY4KtzgKJiWd6sVq2eNUPTi34PiFGjY8=
github.com/alecthomas/repr v0.3.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4=
github.com/artie-labs/transfer v1.22.7 h1:80bDJlDFPNf7B+N9CU/RDk+zpq9dwFg343E0sdzG2iE=
github.com/artie-labs/transfer v1.22.7/go.mod h1:QCJNat9BLIK/jm+o/oC7Cwu+WKrEWILDEATyO492RsM=
github.com/artie-labs/transfer v1.22.8 h1:QiWQQEnskHwHWLhNEg0+019ci/3/WX8X18uscvDh3NY=
github.com/artie-labs/transfer v1.22.8/go.mod h1:QCJNat9BLIK/jm+o/oC7Cwu+WKrEWILDEATyO492RsM=
github.com/aws/aws-sdk-go v1.44.327 h1:ZS8oO4+7MOBLhkdwIhgtVeDzCeWOlTfKJS7EgggbIEY=
github.com/aws/aws-sdk-go v1.44.327/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go-v2 v1.16.12/go.mod h1:C+Ym0ag2LIghJbXhfXZ0YEEp49rBWowxKzJLUoob0ts=
Expand Down Expand Up @@ -70,6 +70,10 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down Expand Up @@ -134,6 +138,7 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
Expand Down Expand Up @@ -161,6 +166,8 @@ github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
Expand Down Expand Up @@ -213,12 +220,22 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/twpayne/go-geom v1.5.3 h1:UdH93XzTwpwPiAV38DJ74yg+9/YV9/WCGbKN+NmSvVA=
github.com/twpayne/go-geom v1.5.3/go.mod h1:scDv/u90MVD6K+/7cA44kQt9fD6M/n+VuLddERxWYR8=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4=
github.com/xdg/stringprep v1.0.3/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk=
go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -295,7 +312,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -314,6 +333,7 @@ golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
21 changes: 20 additions & 1 deletion lib/types.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package lib

import "github.com/artie-labs/transfer/lib/cdc/util"
import (
"github.com/artie-labs/transfer/lib/cdc/mongo"
"github.com/artie-labs/transfer/lib/cdc/util"
)

type RawMessage struct {
TopicSuffix string
PartitionKey map[string]interface{}
payload util.SchemaEventPayload
mongoPayload mongo.SchemaEventPayload

mongo bool
}

func NewRawMessage(topicSuffix string, partitionKey map[string]interface{}, payload util.SchemaEventPayload) RawMessage {
Expand All @@ -16,6 +22,19 @@ func NewRawMessage(topicSuffix string, partitionKey map[string]interface{}, payl
}
}

func NewMongoMessage(topicSuffix string, partitionKey map[string]interface{}, payload mongo.SchemaEventPayload) RawMessage {
return RawMessage{
TopicSuffix: topicSuffix,
PartitionKey: partitionKey,
mongoPayload: payload,
mongo: true,
}
}

func (r RawMessage) GetPayload() interface{} {
if r.mongo {
return r.mongoPayload
}

return r.payload
}
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/artie-labs/reader/lib/mtr"
"github.com/artie-labs/reader/sources"
"github.com/artie-labs/reader/sources/dynamodb"
"github.com/artie-labs/reader/sources/mongo"
"github.com/artie-labs/reader/sources/postgres"
)

Expand Down Expand Up @@ -48,6 +49,8 @@ func buildSource(cfg *config.Settings) (sources.Source, error) {
switch cfg.Source {
case "", config.SourceDynamo:
return dynamodb.Load(*cfg.DynamoDB)
case config.SourceMongoDB:
return mongo.Load(*cfg.MongoDB)
case config.SourcePostgreSQL:
return postgres.Load(*cfg.PostgreSQL)
}
Expand Down
82 changes: 82 additions & 0 deletions sources/mongo/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package mongo

import (
"context"
"fmt"
"github.com/artie-labs/reader/config"
"github.com/artie-labs/reader/lib"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

type collectionScanner struct {
db *mongo.Database
cfg config.MongoDB
collection config.Collection

// mutable
cursor *mongo.Cursor
done bool
}

func newIterator(db *mongo.Database, collection config.Collection, cfg config.MongoDB) *collectionScanner {
return &collectionScanner{
db: db,
cfg: cfg,
collection: collection,
}
}

func (c *collectionScanner) HasNext() bool {
return !c.done
}

func (c *collectionScanner) Next() ([]lib.RawMessage, error) {
if !c.HasNext() {
return nil, fmt.Errorf("no more rows to scan")
}

ctx := context.Background()
if c.cursor == nil {
findOptions := options.Find()
findOptions.SetBatchSize(c.collection.GetBatchSize())
cursor, err := c.db.Collection(c.collection.Name).Find(ctx, bson.D{}, findOptions)
if err != nil {
return nil, fmt.Errorf("failed to find documents: %w", err)
}

c.cursor = cursor
}

var rawMsgs []lib.RawMessage
for c.collection.GetBatchSize() > int32(len(rawMsgs)) && c.cursor.Next(ctx) {
var result bson.M
if err := c.cursor.Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode document: %w", err)
}

mgoMsg, err := parseMessage(result)
if err != nil {
return nil, fmt.Errorf("failed to parse message: %w", err)
}

rawMsg, err := mgoMsg.toRawMessage(c.collection, c.cfg.Database)
if err != nil {
return nil, fmt.Errorf("failed to create raw message: %w", err)
}

rawMsgs = append(rawMsgs, rawMsg)
}

if err := c.cursor.Err(); err != nil {
return nil, fmt.Errorf("failed to iterate over documents: %w", err)
}

// If the number of fetched documents is less than the batch size, we are done
if c.collection.GetBatchSize() > int32(len(rawMsgs)) {
c.done = true
}

return rawMsgs, nil
}
Loading

0 comments on commit 3e6a5e0

Please sign in to comment.