forked from onestraw/golb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchash.go
185 lines (158 loc) · 3.28 KB
/
chash.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package chash
import (
"fmt"
"hash/crc32"
"sort"
"strings"
"sync"
)
// Peer defines a single server.
type Peer struct {
sync.RWMutex
addr string
down bool
}
// Pool is a set of Peers.
type Pool struct {
sync.RWMutex
replica int
vNodes map[uint32]*Peer
sortedHashes []uint32
nodes map[string]bool
downNum int
}
// New returns a Pool object.
func New() *Pool {
return &Pool{
replica: 20,
vNodes: map[uint32]*Peer{},
sortedHashes: []uint32{},
nodes: map[string]bool{},
downNum: 0,
}
}
func (p *Pool) vKey(name string, idx int) string {
return fmt.Sprintf("%s#%d", name, idx)
}
func (p *Pool) hash(key string) uint32 {
h := crc32.NewIEEE()
h.Write([]byte(key))
return h.Sum32()
}
func (p *Pool) String() string {
p.RLock()
defer p.RUnlock()
result := []string{}
for key := range p.nodes {
result = append(result, key)
}
sort.Strings(result)
return strings.Join(result, ", ")
}
// Size return the number of peers.
func (p *Pool) Size() int {
p.RLock()
defer p.RUnlock()
return len(p.sortedHashes) / p.replica
}
// Add adds a peer by address.
func (p *Pool) Add(addr string, args ...interface{}) {
p.Lock()
defer p.Unlock()
if _, ok := p.nodes[addr]; ok {
return
}
p.nodes[addr] = true
peer := &Peer{addr: addr, down: false}
for i := 0; i < p.replica; i++ {
h := p.hash(p.vKey(peer.addr, i))
p.vNodes[h] = peer
p.sortedHashes = append(p.sortedHashes, h)
}
sort.Slice(p.sortedHashes, func(i, j int) bool {
return p.sortedHashes[i] < p.sortedHashes[j]
})
}
// Remove deletes a peer by address.
func (p *Pool) Remove(peerAddr string) {
p.Lock()
defer p.Unlock()
if _, ok := p.nodes[peerAddr]; !ok {
return
}
deleteSortedHashes := func(target uint32) {
for idx, val := range p.sortedHashes {
if val == target {
p.sortedHashes = append(p.sortedHashes[:idx], p.sortedHashes[idx+1:]...)
}
}
}
for i := 0; i < p.replica; i++ {
h := p.hash(p.vKey(peerAddr, i))
if p.vNodes[h].down {
p.downNum--
}
delete(p.vNodes, h)
deleteSortedHashes(h)
}
}
func (p *Pool) setPeerStatus(peerAddr string, isDown bool) {
p.Lock()
defer p.Unlock()
if _, ok := p.nodes[peerAddr]; !ok {
return
}
idx := 1
h := p.hash(p.vKey(peerAddr, idx))
peer := p.vNodes[h]
if peer.down != isDown {
if isDown {
p.downNum++
} else {
p.downNum--
}
peer.Lock()
peer.down = isDown
peer.Unlock()
}
}
// DownPeer mark the peer down.
func (p *Pool) DownPeer(addr string) {
p.setPeerStatus(addr, true)
}
// UpPeer mark the peer up.
func (p *Pool) UpPeer(addr string) {
p.setPeerStatus(addr, false)
}
// Get use a key to map the backend server
// key may be a cookie or request_uri
func (p *Pool) Get(args ...interface{}) string {
if len(args) == 0 {
return ""
}
key, ok := args[0].(string)
if !ok {
return ""
}
p.RLock()
defer p.RUnlock()
if len(p.vNodes) <= 0 || p.downNum >= p.Size() {
return ""
}
h := p.hash(key)
idx := sort.Search(len(p.sortedHashes), func(i int) bool {
return p.sortedHashes[i] >= h && !p.vNodes[p.sortedHashes[i]].down
})
if idx >= len(p.sortedHashes) {
idx = 0
}
return p.vNodes[p.sortedHashes[idx]].addr
}
// CreatePool returns a Pool object.
func CreatePool(addrs []string) *Pool {
pool := New()
for _, addr := range addrs {
pool.Add(addr)
}
return pool
}