-
Notifications
You must be signed in to change notification settings - Fork 0
/
xcache.go
130 lines (110 loc) · 2.55 KB
/
xcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package xcache
import (
"errors"
"fmt"
"log"
"sync"
"xcache/cache"
"xcache/signalflight"
"xcache/xcachepb"
)
//从数据源拉取数据
type Getter interface {
Get(key string) ([]byte, error)
}
type GetterFunc func(key string) ([]byte, error)
func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}
type Group struct {
name string
getter Getter
maincache *Cache
peerPicker PeerPicker
loader *signalflight.Group //保护数据源
}
var (
mu sync.RWMutex //保护groups
groups = make(map[string]*Group)
)
func NewGroup(name string, cacheType cache.CacheType, cacheBytes int, getter Getter) *Group {
if getter == nil {
panic("getter is nil")
}
mu.Lock()
defer mu.Unlock()
g := &Group{
name: name,
getter: getter,
maincache: NewCache(cacheType, cacheBytes, nil),
loader: new(signalflight.Group),
}
groups[name] = g
return g
}
func GetGroup(name string) *Group {
mu.RLock()
defer mu.RUnlock()
return groups[name]
}
func (g *Group) Get(key string) (value ByteView, err error) {
if key == "" {
return ByteView{}, errors.New("key is required")
}
if v, ok := g.maincache.get(key); ok {
//fmt.Println("cache hit")
return v, nil
}
return g.load(key)
}
//从其他结点获取/从本地获取
func (g *Group) load(key string) (value ByteView, err error) {
i, err := g.loader.Do(key, func() (interface{}, error) {
//从远程获取
if g.peerPicker != nil {
if peer, ok := g.peerPicker.PeerPicker(key); ok {
value, err := g.getFromPeer(peer, key)
if err == nil {
return value, nil
}
log.Println("[XCache] Failed to get from peer", err)
} else if peer == nil {
fmt.Println("from self")
}
}
//如果远程结点获取失败,则从本地获取
return g.getLocally(key)
})
if err == nil {
return i.(ByteView), nil
}
return
}
func (g *Group) getLocally(key string) (value ByteView, err error) {
data, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value = ByteView{b: cloneBytes(data)}
g.populateCache(key, value)
return value, nil
}
//更新缓存
func (g *Group) populateCache(key string, value ByteView) {
g.maincache.set(key, value)
}
func (g *Group) RegisterPeers(peers PeerPicker) {
if peers == nil {
panic("peers cann't be nil")
}
g.peerPicker = peers
}
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
in := xcachepb.Request{Group: g.name, Key: key}
out := new(xcachepb.Response)
err := peer.Get(&in, out)
if err != nil {
return ByteView{}, err
}
return ByteView{b: out.GetValue()}, nil
}