-
Notifications
You must be signed in to change notification settings - Fork 3
/
dsn_selector.go
277 lines (241 loc) · 7.57 KB
/
dsn_selector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//
// Package dsn data layer storage address method, mainly used for mysql mongodb and other database address
package dsn
import (
"errors"
"fmt"
"net"
"sync"
"time"
"trpc.group/trpc-go/trpc-go/naming/registry"
"trpc.group/trpc-go/trpc-go/naming/selector"
)
// SeletorName dsn selector name
var SeletorName string = "dsn"
func init() {
selector.Register(SeletorName, DefaultSelector)
}
// DefaultSelector dsn default selector
var DefaultSelector = NewDsnSelector(false)
// NewDsnSelector creates a new dsn selector.
func NewDsnSelector(keepAddrUnresolved bool) *DsnSelector {
var parseAddr func(network, address string) net.Addr
if keepAddrUnresolved {
parseAddr = func(network, address string) net.Addr {
return unresolvedAddr{network: network, address: address}
}
}
return &DsnSelector{
parseAddr: parseAddr,
dsns: make(map[string]*registry.Node),
}
}
// DsnSelector returns original service name node, with memory cache
type DsnSelector struct {
parseAddr func(network, address string) net.Addr
dsns map[string]*registry.Node
lock sync.RWMutex
}
// Select selects address from dsn://user:passwd@tcp(ip:port)/db
func (s *DsnSelector) Select(serviceName string, opt ...selector.Option) (*registry.Node, error) {
if len(serviceName) == 0 {
return nil, errors.New("dsn address can not be empty")
}
s.lock.RLock()
node, ok := s.dsns[serviceName]
s.lock.RUnlock()
if ok {
return node, nil
}
s.lock.Lock()
defer s.lock.Unlock()
node, ok = s.dsns[serviceName]
if ok {
return node, nil
}
node = ®istry.Node{
ServiceName: serviceName,
Address: serviceName,
ParseAddr: s.parseAddr,
}
s.dsns[serviceName] = node
return node, nil
}
// Report dsn selector no need to report
func (s *DsnSelector) Report(node *registry.Node, cost time.Duration, err error) error {
return nil
}
// Options are ResolvableSelector options.
type Options struct {
Extractor ServiceNameExtractor
EnableParseAddr bool
}
// Option sets ResolvableSelector options.
type Option func(*Options)
// WithExtractor ...
func WithExtractor(extractor ServiceNameExtractor) Option {
return func(o *Options) {
o.Extractor = extractor
}
}
// WithEnableParseAddr ...
func WithEnableParseAddr(enable bool) Option {
return func(o *Options) {
o.EnableParseAddr = enable
}
}
// ResolvableSelector dsn-selector with address resolver
type ResolvableSelector struct {
dsns map[string]*registry.Node
lock sync.RWMutex
resolverSelectorName string
extractor ServiceNameExtractor
EnableParseAddr bool
}
// ServiceNameExtractor extracts the part of the service name in the dsn, and return the starting position and length
type ServiceNameExtractor interface {
Extract(string) (int, int, error)
}
// NewResolvableSelector selector contains address resolution, implemented by other selectors. selectorName is the
// selector name for address resolution extractor is the func to extract the service name from the dsn,
// the extracted service name is the parameter of the selector used for address resolution
// eg:
// target: mongodb+polaris://user:passwd@poloars_name
// extractor extract polaris_name from target
// polaris selector will resolve polaris_name to address
func NewResolvableSelector(selectorName string, extractor ServiceNameExtractor) selector.Selector {
return &ResolvableSelector{
dsns: make(map[string]*registry.Node),
resolverSelectorName: selectorName,
extractor: extractor,
}
}
// NewResolvableSelectorWithOpts ...
func NewResolvableSelectorWithOpts(selectorName string, opt ...Option) selector.Selector {
opts := &Options{
Extractor: &URIHostExtractor{},
EnableParseAddr: true,
}
for _, o := range opt {
o(opts)
}
return &ResolvableSelector{
dsns: make(map[string]*registry.Node),
resolverSelectorName: selectorName,
extractor: opts.Extractor,
EnableParseAddr: opts.EnableParseAddr,
}
}
// Select selects address from mongodb+polaris://user:passwd@poloars_name/db
func (s *ResolvableSelector) Select(serviceName string, opt ...selector.Option) (*registry.Node, error) {
// resolve serviceName from dsn
pos, length, err := s.extractService(serviceName)
if err != nil {
return nil, err
}
extractedServiceName := serviceName[pos : pos+length]
// selector select a available node
resolvedNode, err := s.resolveAddress(extractedServiceName, opt...)
if err != nil {
return nil, err
}
address := serviceName[:pos] + resolvedNode.Address + serviceName[pos+length:]
if len(address) == 0 {
return nil, errors.New("dsn address can not be empty")
}
return s.dsnRW(address, serviceName, resolvedNode), nil
}
func (s *ResolvableSelector) extractService(serviceName string) (int, int, error) {
if len(s.resolverSelectorName) == 0 {
return 0, 0, errors.New("resolver selector name can not be empty")
}
if s.extractor == nil {
return 0, 0, errors.New("service name extractor can not be nil")
}
pos, length, err := s.extractor.Extract(serviceName)
if err != nil {
return 0, 0, err
}
if length == 0 {
return 0, 0, fmt.Errorf("the extracted service name is empty and the dsn is %s", serviceName)
}
return pos, length, nil
}
func (s *ResolvableSelector) resolveAddress(serviceName string, opt ...selector.Option) (*registry.Node, error) {
resolver := selector.Get(s.resolverSelectorName)
if resolver == nil {
return nil, errors.New("unknown selector name " + s.resolverSelectorName)
}
return resolver.Select(serviceName, opt...)
}
func (s *ResolvableSelector) dsnRW(address, serviceName string, resolvedNode *registry.Node) *registry.Node {
s.lock.RLock()
node, ok := s.dsns[address]
s.lock.RUnlock()
if ok {
return node
}
s.lock.Lock()
defer s.lock.Unlock()
node, ok = s.dsns[address]
if ok {
return node
}
node = ®istry.Node{
ServiceName: serviceName,
Address: address,
Metadata: map[string]interface{}{
"resolved": resolvedNode,
},
}
if s.EnableParseAddr {
node.ParseAddr = func(network, address string) net.Addr {
return unresolvedAddr{resolvedNode.Network, resolvedNode.Address}
}
}
s.dsns[address] = node
return node
}
// Report dsn selector does not need to report, but the embedded selector needs to report. this func is only executed
// after Select() is successful, so the node returned by Select() needs to be checked before report
func (s *ResolvableSelector) Report(node *registry.Node, cost time.Duration, err error) error {
if node.Metadata == nil {
return errors.New("metadata can not be nil")
}
resolved, ok := node.Metadata["resolved"]
if !ok {
return errors.New("the resolved in the metadata can not be nil")
}
resolvedNode, ok := resolved.(*registry.Node)
if !ok {
return errors.New("the resolved in the metadata is illegal")
}
resolver := selector.Get(s.resolverSelectorName)
if resolver == nil {
return errors.New("unknown selector name " + s.resolverSelectorName)
}
return resolver.Report(resolvedNode, cost, err)
}
type unresolvedAddr struct {
network string
address string
}
// Network returns the unresolved original network.
func (a unresolvedAddr) Network() string {
return a.network
}
// String returns the unresolved original address.
func (a unresolvedAddr) String() string {
return a.address
}