-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
127 lines (114 loc) · 2.45 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2018 Granitic. All rights reserved.
// Use of this source code is governed by an Apache 2.0 license that can be found in the LICENSE file at the root of this project.
package hprose_go_nats
import (
"github.com/hprose/hprose-golang/rpc"
"github.com/nats-io/go-nats"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)
type NatsServer struct {
rpc.BaseService
conn *nats.Conn
queue chan *nats.Msg
signal chan os.Signal
uri string
opt *NatsOption
contextPool sync.Pool
workerPool *rpc.WorkerPool
}
func NewServer(opt *NatsOption) rpc.Server {
server := &NatsServer{
opt: opt,
uri: strings.Join(opt.uri, ","),
}
server.contextPool.New = func() interface{} {
return new(rpc.BaseServiceContext)
}
server.InitBaseService()
return server
}
func (ns *NatsServer) worker() {
for {
msg, ok := <-ns.queue
if !ok {
break
}
if nil != msg {
ns.workerPool.Go(func() {
ns.handle(msg)
})
}
}
}
func (ns *NatsServer) init() (err error) {
if nil == ns.conn {
if ns.conn, err = nats.Connect(ns.uri, ns.opt.options...); nil != err {
return
}
}
if nil == ns.workerPool {
ns.workerPool = new(rpc.WorkerPool)
ns.workerPool.Start()
}
if nil == ns.queue {
ns.queue = make(chan *nats.Msg, ns.opt.queue)
}
if _, err = ns.conn.ChanQueueSubscribe(ns.opt.topic, ns.opt.group, ns.queue); nil != err {
return
}
go ns.worker()
if nil == ns.signal {
ns.signal = make(chan os.Signal, 1)
signal.Notify(ns.signal, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
}
return
}
func (ns *NatsServer) handle(msg *nats.Msg) {
ctx := ns.contextPool.Get().(*rpc.BaseServiceContext)
defer ns.contextPool.Put(ctx)
ctx.InitServiceContext(ns)
data := ns.BaseService.Handle(msg.Data, ctx)
ns.conn.Publish(msg.Reply, data)
}
func (ns *NatsServer) URI() string {
return ns.uri
}
func (ns *NatsServer) Handle() error {
return nil
}
func (ns *NatsServer) Close() {
if nil != ns.signal {
signal.Stop(ns.signal)
ns.signal = nil
}
if nil != ns.queue {
close(ns.queue)
ns.queue = nil
}
if nil != ns.conn {
ns.conn.Close()
ns.conn = nil
}
if nil != ns.workerPool {
ns.workerPool.Stop()
ns.workerPool = nil
}
}
func (ns *NatsServer) Start() (err error) {
if err = ns.init(); nil != err {
return
}
<-ns.signal
ns.Close()
return nil
}
func (ns *NatsServer) Restart() {
ns.signal <- syscall.SIGHUP
}
func (ns *NatsServer) Stop() {
ns.signal <- syscall.SIGQUIT
}