-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathlifecycle.go
144 lines (119 loc) · 2.88 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package lifecycle
import (
"context"
"errors"
)
var ErrRunning = errors.New("lifecycle: still running")
type Lifecycle interface {
LifecycleReader
// ShutdownRequest() returns a channel that is available for reading when
// a shutdown has requested.
ShutdownRequest() <-chan error
// ShutdownInitiated() declares that shutdown has begun. Will panic if called twice.
ShutdownInitiated(error)
// ShutdownCompleted() declares that shutdown has completed. Will panic if called twice.
ShutdownCompleted()
// WatchContext() observes the given context and initiates a shutdown
// if the context is shutdown before the lifecycle is.
WatchContext(context.Context)
// Begins shutdown when given channel is ready for reading.
WatchChannel(<-chan struct{})
// Shutdown() initiates shutdown by sending a value to the channel
// requtned by ShutdownRequest() and blocks untill ShutdownCompleted()
// is called.
Shutdown(error)
// Initiate shutdown but does not block until complete.
ShutdownAsync(error)
}
// LifecycleReader exposes read-only access to lifecycle state.
type LifecycleReader interface {
// ShuttingDown() returns a channel that is available for reading
// after ShutdownInitiated() has been called.
ShuttingDown() <-chan struct{}
// Done() returns a channel that is available for reading
// after ShutdownCompleted() has been called.
Done() <-chan struct{}
Error() error
}
type lifecycle struct {
stopch chan error
stoppingch chan struct{}
stoppedch chan struct{}
reason error
}
func New() Lifecycle {
return &lifecycle{
stopch: make(chan error),
stoppingch: make(chan struct{}),
stoppedch: make(chan struct{}),
}
}
func (l *lifecycle) ShutdownRequest() <-chan error {
return l.stopch
}
func (l *lifecycle) ShutdownInitiated(err error) {
l.reason = err
close(l.stoppingch)
}
func (l *lifecycle) ShuttingDown() <-chan struct{} {
return l.stoppingch
}
func (l *lifecycle) ShutdownCompleted() {
close(l.stoppedch)
}
func (l *lifecycle) Done() <-chan struct{} {
return l.stoppedch
}
func (l *lifecycle) Error() error {
select {
case <-l.stoppingch:
return l.reason
default:
return ErrRunning
}
}
func (l *lifecycle) Shutdown(err error) {
select {
case <-l.stoppedch:
return
case l.stopch <- err:
case <-l.stoppingch:
}
<-l.stoppedch
}
func (l *lifecycle) ShutdownAsync(err error) {
select {
case <-l.stoppedch:
case <-l.stoppingch:
case l.stopch <- err:
}
}
func (l *lifecycle) WatchContext(ctx context.Context) {
donech := ctx.Done()
var stopch chan error
for {
select {
case <-l.stoppingch:
return
case <-donech:
donech = nil
stopch = l.stopch
case stopch <- ctx.Err():
return
}
}
}
func (l *lifecycle) WatchChannel(donech <-chan struct{}) {
var stopch chan error
for {
select {
case <-l.stoppingch:
return
case <-donech:
donech = nil
stopch = l.stopch
case stopch <- nil:
return
}
}
}