-
Notifications
You must be signed in to change notification settings - Fork 16
/
document_subscriptions.go
223 lines (187 loc) · 7.08 KB
/
document_subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package ravendb
import (
"io"
"reflect"
"sync"
)
// DocumentSubscriptions allows subscribing to changes in the store
type DocumentSubscriptions struct {
store *DocumentStore
subscriptions map[io.Closer]bool
mu sync.Mutex // protects subscriptions
}
func newDocumentSubscriptions(store *DocumentStore) *DocumentSubscriptions {
return &DocumentSubscriptions{
store: store,
subscriptions: map[io.Closer]bool{},
}
}
// Create creates a data subscription in a database. The subscription will expose all documents that match the specified subscription options for a given type.
func (s *DocumentSubscriptions) Create(options *SubscriptionCreationOptions, database string) (string, error) {
if options == nil {
return "", newIllegalArgumentError("Cannot create a subscription if options is nil")
}
if options.Query == "" {
return "", newIllegalArgumentError("Cannot create a subscription if Query is empty string")
}
if database == "" {
database = s.store.GetDatabase()
}
requestExecutor := s.store.GetRequestExecutor(database)
command := newCreateSubscriptionCommand(s.store.GetConventions(), options, "")
if err := requestExecutor.ExecuteCommand(command, nil); err != nil {
return "", err
}
return command.Result.Name, nil
}
// CreateForType creates a data subscription in a database. The subscription will
// expose all documents that match the specified subscription options for a given type.
func (s *DocumentSubscriptions) CreateForType(clazz reflect.Type, options *SubscriptionCreationOptions, database string) (string, error) {
if options == nil {
options = &SubscriptionCreationOptions{}
}
creationOptions := &SubscriptionCreationOptions{
Name: options.Name,
ChangeVector: options.ChangeVector,
Query: options.Query,
}
opts := s.ensureCriteria(creationOptions, clazz, false)
return s.Create(opts, database)
}
// CreateForRevisions creates a data subscription in a database. The subscription will
// expose all documents that match the specified subscription options for a given type.
func (s *DocumentSubscriptions) CreateForRevisions(clazz reflect.Type, options *SubscriptionCreationOptions, database string) (string, error) {
if options == nil {
options = &SubscriptionCreationOptions{}
}
creationOptions := &SubscriptionCreationOptions{
Name: options.Name,
ChangeVector: options.ChangeVector,
Query: options.Query,
}
opts := s.ensureCriteria(creationOptions, clazz, true)
return s.Create(opts, database)
}
func (s *DocumentSubscriptions) ensureCriteria(criteria *SubscriptionCreationOptions, clazz reflect.Type, revisions bool) *SubscriptionCreationOptions {
if criteria == nil {
criteria = &SubscriptionCreationOptions{}
}
collectionName := s.store.GetConventions().getCollectionName(clazz)
if criteria.Query == "" {
if revisions {
criteria.Query = "from " + collectionName + " (Revisions = true) as doc"
} else {
criteria.Query = "from " + collectionName + " as doc"
}
}
return criteria
}
// GetSubscriptionWorker opens a subscription and starts pulling documents since
// a last processed document for that subscription.
// The connection options determine client and server cooperation rules like
// document batch sizes or a timeout in a matter of which a client
// needs to acknowledge that batch has been processed. The acknowledgment
// is sent after all documents are processed by subscription's handlers.
func (s *DocumentSubscriptions) GetSubscriptionWorker(clazz reflect.Type, options *SubscriptionWorkerOptions, database string) (*SubscriptionWorker, error) {
if err := s.store.assertInitialized(); err != nil {
return nil, err
}
if options == nil {
return nil, newIllegalStateError("Cannot open a subscription if options are null")
}
subscription, err := NewSubscriptionWorker(clazz, options, false, s.store, database)
if err != nil {
return nil, err
}
fn := func(worker *SubscriptionWorker) {
s.mu.Lock()
delete(s.subscriptions, worker)
s.mu.Unlock()
}
subscription.onClosed = fn
s.mu.Lock()
s.subscriptions[subscription] = true
s.mu.Unlock()
return subscription, nil
}
// GetSubscriptionWorkerForRevisions opens a subscription and starts pulling documents
// since a last processed document for that subscription.
// The connection options determine client and server cooperation rules like document
// batch sizes or a timeout in a matter of which a client
// needs to acknowledge that batch has been processed. The acknowledgment is sent
// after all documents are processed by subscription's handlers.
func (s *DocumentSubscriptions) GetSubscriptionWorkerForRevisions(clazz reflect.Type, options *SubscriptionWorkerOptions, database string) (*SubscriptionWorker, error) {
subscription, err := NewSubscriptionWorker(clazz, options, true, s.store, database)
if err != nil {
return nil, err
}
fn := func(sender *SubscriptionWorker) {
s.mu.Lock()
delete(s.subscriptions, sender)
s.mu.Unlock()
}
subscription.onClosed = fn
s.mu.Lock()
s.subscriptions[subscription] = true
s.mu.Unlock()
return subscription, nil
}
// GetSubscriptions downloads a list of all existing subscriptions in a database.
func (s *DocumentSubscriptions) GetSubscriptions(start int, take int, database string) ([]*SubscriptionState, error) {
if database == "" {
database = s.store.GetDatabase()
}
requestExecutor := s.store.GetRequestExecutor(database)
command := newGetSubscriptionsCommand(start, take)
if err := requestExecutor.ExecuteCommand(command, nil); err != nil {
return nil, err
}
return command.Result, nil
}
// Delete deletes a subscription.
func (s *DocumentSubscriptions) Delete(name string, database string) error {
if database == "" {
database = s.store.GetDatabase()
}
requestExecutor := s.store.GetRequestExecutor(database)
command := newDeleteSubscriptionCommand(name)
return requestExecutor.ExecuteCommand(command, nil)
}
// GetSubscriptionState returns subscription definition and it's current state
func (s *DocumentSubscriptions) GetSubscriptionState(subscriptionName string, database string) (*SubscriptionState, error) {
if subscriptionName == "" {
return nil, newIllegalArgumentError("SubscriptionName cannot be null")
}
if database == "" {
database = s.store.GetDatabase()
}
requestExecutor := s.store.GetRequestExecutor(database)
command := newGetSubscriptionStateCommand(subscriptionName)
if err := requestExecutor.ExecuteCommand(command, nil); err != nil {
return nil, err
}
return command.Result, nil
}
// Close closes subscriptions
func (s *DocumentSubscriptions) Close() error {
if len(s.subscriptions) == 0 {
return nil
}
var err error
for subscription := range s.subscriptions {
err2 := subscription.Close()
if err2 != nil {
err = err2
}
}
return err
}
// DropConnection forces server to close current client subscription connection to the server
func (s *DocumentSubscriptions) DropConnection(name string, database string) error {
if database == "" {
database = s.store.GetDatabase()
}
requestExecutor := s.store.GetRequestExecutor(database)
command := newDropSubscriptionConnectionCommand(name)
return requestExecutor.ExecuteCommand(command, nil)
}