-
Notifications
You must be signed in to change notification settings - Fork 0
/
delay.go
149 lines (133 loc) · 2.9 KB
/
delay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package gdelay
import (
"log"
"reflect"
"sync"
"time"
)
const MaxConcurrentG = 3
type DelayParam struct {
Duration int64
Fun any
FuncParam []reflect.Value
MethodName string
Obj any
MethodParam []reflect.Value
methodCallback func(any)
}
type Delay struct {
delayEntities map[int64][]*DelayParam
tk *time.Ticker
mu sync.Mutex
stopChan chan struct{}
concurrentChan chan int
}
func NewDelay(concurrentG int) *Delay {
if concurrentG > MaxConcurrentG {
concurrentG = MaxConcurrentG
}
return &Delay{
concurrentChan: make(chan int, concurrentG),
stopChan: make(chan struct{}),
}
}
func (d *Delay) Start() {
go func() {
d.clearTicker()
d.tk = time.NewTicker(time.Second)
for {
select {
case <-d.stopChan:
log.Println("ticker stop")
return
case <-d.tk.C:
log.Printf("ticker")
d.process()
}
}
}()
}
func (d *Delay) Stop() {
d.clearTicker()
d.delayEntities = nil
}
func (d *Delay) process() {
d.mu.Lock()
defer func() {
d.mu.Unlock()
}()
for callTime, delayList := range d.delayEntities {
if time.Now().Unix() >= callTime {
delete(d.delayEntities, callTime)
curr := delayList
for _, delay := range curr {
currDelay := delay
d.concurrentChan <- 1
go func() {
defer func() {
if err := recover(); err != nil {
log.Printf("%+v", err)
d.clearTicker()
d.stopChan <- struct{}{}
d.Start()
}
<-d.concurrentChan
}()
if currDelay.Obj != nil && len(currDelay.MethodName) > 0 {
obj := reflect.ValueOf(currDelay.Obj)
m := obj.MethodByName(currDelay.MethodName)
if !m.IsValid() {
log.Printf("method %s invalid", currDelay.MethodName)
return
}
m.Call(currDelay.MethodParam)
} else {
f := reflect.ValueOf(currDelay.Fun)
if !f.IsValid() {
log.Printf("func %+v invalid", currDelay.Fun)
return
}
f.Call(currDelay.FuncParam)
}
}()
}
}
}
}
func (d *Delay) DelayAdd(param *DelayParam) {
d.mu.Lock()
defer func() {
d.mu.Unlock()
}()
if d.delayEntities == nil {
d.delayEntities = make(map[int64][]*DelayParam)
}
_, ok := d.delayEntities[param.Duration]
if ok {
d.delayEntities[param.Duration] = append(d.delayEntities[param.Duration], param)
} else {
d.delayEntities[param.Duration] = []*DelayParam{param}
}
log.Printf("delay entity : %+v", d.delayEntities)
}
func (d *Delay) AddFunc(duration int64, fun any, funcParam []reflect.Value) {
d.DelayAdd(&DelayParam{
Duration: duration,
Fun: fun,
FuncParam: funcParam,
})
}
func (d *Delay) AddMethod(duration int64, obj any, methodName string, methodParam []reflect.Value) {
d.DelayAdd(&DelayParam{
Duration: duration,
Obj: obj,
MethodName: methodName,
MethodParam: methodParam,
})
}
func (d *Delay) clearTicker() {
if d.tk != nil {
d.tk.Stop()
d.tk = nil
}
}