-
Notifications
You must be signed in to change notification settings - Fork 0
/
http_client.go
139 lines (121 loc) · 2.93 KB
/
http_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package xcache
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sync"
"time"
"xcache/consistenthash"
"xcache/signalflight"
"xcache/xcachepb"
servicediscover "xcache/service_discover"
"github.com/golang/protobuf/proto"
)
const (
defalutBasePath = "/_xcache/"
defalutReplicas = 50
)
type HttpClient struct {
mu sync.RWMutex
peers *consistenthash.Map
httpGetter map[string]*httpGetter
registry *servicediscover.Registry
registryAddr string
}
func NewHttpClient(replicas int, fn consistenthash.Hash, timeout time.Duration, addr string) *HttpClient {
return &HttpClient{
peers: consistenthash.New(replicas, fn),
httpGetter: make(map[string]*httpGetter),
registry: servicediscover.New(timeout),
registryAddr: addr,
}
}
func (p *HttpClient) Run(discoveryCycle time.Duration) {
go func() {
if err := http.ListenAndServe(p.registryAddr, p.registry); err != nil {
log.Fatalln(err)
}
}()
go func() {
c := servicediscover.NewGeeRegistryDiscovery(fmt.Sprintf("http://%s", p.registryAddr), discoveryCycle)
ticker := time.NewTicker(discoveryCycle)
c.Refresh()
p.SetPeers(c.Servers()...)
for {
<-ticker.C
c.Refresh()
p.SetPeers(c.Servers()...)
}
}()
}
func (p *HttpClient) SetPeers(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defalutReplicas, nil)
p.peers.Add(peers...)
p.httpGetter = make(map[string]*httpGetter)
for _, peer := range peers {
p.httpGetter[peer] = &httpGetter{baseURL: peer + defalutBasePath, loader: new(signalflight.Group)}
}
}
//实现PeerPicker接口
func (p *HttpClient) PeerPicker(key string) (PeerGetter, bool) {
p.mu.RLock()
defer p.mu.RUnlock()
if peer := p.peers.Get(key); peer != "" {
return p.httpGetter[peer], true
}
return nil, false
}
type httpGetter struct {
baseURL string
loader *signalflight.Group
}
var client *http.Client
func init() {
tr := http.DefaultTransport.(*http.Transport)
tr2 := tr.Clone()
tr2.MaxConnsPerHost = 20
client = &http.Client{
Timeout: time.Second * 2,
Transport: tr,
}
}
//实现PeerGetter接口
func (h *httpGetter) Get(in *xcachepb.Request, out *xcachepb.Response) error {
u := fmt.Sprintf(
"%s%s/%s",
h.baseURL,
url.QueryEscape(in.Group),
url.QueryEscape(in.Key),
)
//保护远程结点,提高性能
v, err := h.loader.Do(in.Group+in.Key, func() (value interface{}, err error) {
res, err := client.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
if err = proto.Unmarshal(bytes, out); err != nil {
return nil, err
}
return out, nil
})
if err != nil {
return err
}
out = v.(*xcachepb.Response)
return nil
}