-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathlungo.go
170 lines (158 loc) · 7.17 KB
/
lungo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package lungo
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
)
// IClient defines a generic client.
type IClient interface {
Connect(context.Context) error
Database(string, ...*options.DatabaseOptions) IDatabase
Disconnect(context.Context) error
ListDatabaseNames(context.Context, interface{}, ...*options.ListDatabasesOptions) ([]string, error)
ListDatabases(context.Context, interface{}, ...*options.ListDatabasesOptions) (mongo.ListDatabasesResult, error)
NumberSessionsInProgress() int
Ping(context.Context, *readpref.ReadPref) error
StartSession(...*options.SessionOptions) (ISession, error)
Timeout() *time.Duration
UseSession(context.Context, func(ISessionContext) error) error
UseSessionWithOptions(context.Context, *options.SessionOptions, func(ISessionContext) error) error
Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
}
// IDatabase defines a generic database.
type IDatabase interface {
Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)
Client() IClient
Collection(string, ...*options.CollectionOptions) ICollection
CreateCollection(context.Context, string, ...*options.CreateCollectionOptions) error
CreateView(context.Context, string, string, interface{}, ...*options.CreateViewOptions) error
Drop(context.Context) error
ListCollectionNames(context.Context, interface{}, ...*options.ListCollectionsOptions) ([]string, error)
ListCollectionSpecifications(context.Context, interface{}, ...*options.ListCollectionsOptions) ([]*mongo.CollectionSpecification, error)
ListCollections(context.Context, interface{}, ...*options.ListCollectionsOptions) (ICursor, error)
Name() string
ReadConcern() *readconcern.ReadConcern
ReadPreference() *readpref.ReadPref
RunCommand(context.Context, interface{}, ...*options.RunCmdOptions) ISingleResult
RunCommandCursor(context.Context, interface{}, ...*options.RunCmdOptions) (ICursor, error)
Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
WriteConcern() *writeconcern.WriteConcern
}
// ICollection defines a generic collection.
type ICollection interface {
Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (ICursor, error)
BulkWrite(context.Context, []mongo.WriteModel, ...*options.BulkWriteOptions) (*mongo.BulkWriteResult, error)
Clone(...*options.CollectionOptions) (ICollection, error)
CountDocuments(context.Context, interface{}, ...*options.CountOptions) (int64, error)
Database() IDatabase
DeleteMany(context.Context, interface{}, ...*options.DeleteOptions) (*mongo.DeleteResult, error)
DeleteOne(context.Context, interface{}, ...*options.DeleteOptions) (*mongo.DeleteResult, error)
Distinct(context.Context, string, interface{}, ...*options.DistinctOptions) ([]interface{}, error)
Drop(context.Context) error
EstimatedDocumentCount(context.Context, ...*options.EstimatedDocumentCountOptions) (int64, error)
Find(context.Context, interface{}, ...*options.FindOptions) (ICursor, error)
FindOne(context.Context, interface{}, ...*options.FindOneOptions) ISingleResult
FindOneAndDelete(context.Context, interface{}, ...*options.FindOneAndDeleteOptions) ISingleResult
FindOneAndReplace(context.Context, interface{}, interface{}, ...*options.FindOneAndReplaceOptions) ISingleResult
FindOneAndUpdate(context.Context, interface{}, interface{}, ...*options.FindOneAndUpdateOptions) ISingleResult
Indexes() IIndexView
InsertMany(context.Context, []interface{}, ...*options.InsertManyOptions) (*mongo.InsertManyResult, error)
InsertOne(context.Context, interface{}, ...*options.InsertOneOptions) (*mongo.InsertOneResult, error)
Name() string
ReplaceOne(context.Context, interface{}, interface{}, ...*options.ReplaceOptions) (*mongo.UpdateResult, error)
SearchIndexes() mongo.SearchIndexView
UpdateByID(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
UpdateMany(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
UpdateOne(context.Context, interface{}, interface{}, ...*options.UpdateOptions) (*mongo.UpdateResult, error)
Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (IChangeStream, error)
}
// ICursor defines a generic cursor.
type ICursor interface {
All(context.Context, interface{}) error
Close(context.Context) error
Decode(interface{}) error
Err() error
ID() int64
Next(context.Context) bool
RemainingBatchLength() int
SetBatchSize(batchSize int32)
SetComment(interface{})
SetMaxTime(time.Duration)
TryNext(context.Context) bool
}
// ISingleResult defines a generic single result
type ISingleResult interface {
Decode(interface{}) error
DecodeBytes() (bson.Raw, error)
Err() error
Raw() (bson.Raw, error)
}
// IIndexView defines a generic index view.
type IIndexView interface {
CreateMany(context.Context, []mongo.IndexModel, ...*options.CreateIndexesOptions) ([]string, error)
CreateOne(context.Context, mongo.IndexModel, ...*options.CreateIndexesOptions) (string, error)
DropAll(context.Context, ...*options.DropIndexesOptions) (bson.Raw, error)
DropOne(context.Context, string, ...*options.DropIndexesOptions) (bson.Raw, error)
List(context.Context, ...*options.ListIndexesOptions) (ICursor, error)
ListSpecifications(context.Context, ...*options.ListIndexesOptions) ([]*mongo.IndexSpecification, error)
}
// IChangeStream defines a generic change stream.
type IChangeStream interface {
Close(context.Context) error
Decode(interface{}) error
Err() error
ID() int64
Next(context.Context) bool
ResumeToken() bson.Raw
SetBatchSize(int32)
TryNext(context.Context) bool
}
// ISession defines a generic session.
type ISession interface {
ID() bson.Raw
AbortTransaction(context.Context) error
AdvanceClusterTime(bson.Raw) error
AdvanceOperationTime(*primitive.Timestamp) error
Client() IClient
ClusterTime() bson.Raw
CommitTransaction(context.Context) error
EndSession(context.Context)
OperationTime() *primitive.Timestamp
StartTransaction(...*options.TransactionOptions) error
WithTransaction(context.Context, func(ISessionContext) (interface{}, error), ...*options.TransactionOptions) (interface{}, error)
}
// ISessionContext defines a generic session context.
type ISessionContext interface {
context.Context
ISession
}
// WithSession will yield a session context to the provided callback that uses
// the specified session.
func WithSession(ctx context.Context, session ISession, fn func(ISessionContext) error) error {
switch ses := session.(type) {
case *MongoSession:
return mongo.WithSession(ensureContext(ctx), ses.Session, func(sc mongo.SessionContext) error {
return fn(&MongoSessionContext{
Context: sc,
MongoSession: &MongoSession{
Session: sc,
client: ses.client,
},
})
})
case *Session:
return fn(&SessionContext{
Context: context.WithValue(ensureContext(ctx), sessionKey{}, ses),
Session: ses,
})
default:
return fmt.Errorf("unknown session %T", session)
}
}