-
Notifications
You must be signed in to change notification settings - Fork 104
/
Copy pathclient.go
119 lines (99 loc) · 3.4 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package gocb
import (
gocbcore "github.com/couchbase/gocbcore/v10"
"time"
)
type connectionManager interface {
connect() error
openBucket(bucketName string) error
buildConfig(cluster *Cluster) error
connection(bucketName string) (*gocbcore.Agent, error)
close() error
getKvProvider(bucketName string) (kvProvider, error)
getKvBulkProvider(bucketName string) (kvBulkProvider, error)
getKvCapabilitiesProvider(bucketName string) (kvCapabilityVerifier, error)
getViewProvider(bucketName string) (viewProvider, error)
getViewIndexProvider(bucketName string) (viewIndexProvider, error)
getQueryProvider() (queryProvider, error)
getQueryIndexProvider() (queryIndexProvider, error)
getAnalyticsProvider() (analyticsProvider, error)
getAnalyticsIndexProvider() (analyticsIndexProvider, error)
getSearchProvider() (searchProvider, error)
getHTTPProvider(bucketName string) (httpProvider, error)
getDiagnosticsProvider(bucketName string) (diagnosticsProvider, error)
getWaitUntilReadyProvider(bucketName string) (waitUntilReadyProvider, error)
getCollectionsManagementProvider(bucketName string) (collectionsManagementProvider, error)
getBucketManagementProvider() (bucketManagementProvider, error)
getSearchIndexProvider() (searchIndexProvider, error)
getSearchCapabilitiesProvider() (searchCapabilityVerifier, error)
getEventingManagementProvider() (eventingManagementProvider, error)
getUserManagerProvider() (userManagerProvider, error)
getInternalProvider() (internalProvider, error)
initTransactions(config TransactionsConfig, cluster *Cluster) error
getTransactionsProvider() (transactionsProvider, error)
getMeter() *meterWrapper
opController
}
type opController interface {
MarkOpBeginning()
MarkOpCompleted()
}
type providerController[P any] struct {
get func() (P, error)
opController
// Metrics-related fields
meter *meterWrapper
keyspace *keyspace
service string
}
func autoOpControl[T any, P any](controller *providerController[P], operation string, opFn func(P) (T, error)) (T, error) {
controller.MarkOpBeginning()
defer controller.MarkOpCompleted()
p, err := controller.get()
if err != nil {
var emptyT T
return emptyT, err
}
start := time.Now()
retT, err := opFn(p)
if operation != "" && controller.meter != nil {
defer controller.meter.ValueRecord(controller.service, operation, start, controller.keyspace, err)
}
if err != nil {
var emptyT T
return emptyT, err
}
return retT, nil
}
func autoOpControlErrorOnly[P any](controller *providerController[P], operation string, opFn func(P) error) error {
_, err := autoOpControl(controller, operation, func(provider P) (struct{}, error) {
err := opFn(provider)
return struct{}{}, err
})
return err
}
type newConnectionMgrOptions struct {
tracer *tracerWrapper
meter *meterWrapper
preferredServerGroup string
}
func (c *Cluster) newConnectionMgr(protocol string, opts *newConnectionMgrOptions) connectionManager {
switch protocol {
case "couchbase2":
return &psConnectionMgr{
timeouts: c.timeoutsConfig,
tracer: opts.tracer,
meter: opts.meter,
defaultRetry: c.retryStrategyWrapper.wrapped,
}
default:
return &stdConnectionMgr{
retryStrategyWrapper: c.retryStrategyWrapper,
transcoder: c.transcoder,
timeouts: c.timeoutsConfig,
tracer: opts.tracer,
meter: opts.meter,
preferredServerGroup: opts.preferredServerGroup,
}
}
}