-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathds.go
143 lines (124 loc) · 2.74 KB
/
ds.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package kail
import (
"context"
logutil "github.com/boz/go-logutil"
"github.com/boz/kcache/types/daemonset"
"github.com/boz/kcache/types/deployment"
"github.com/boz/kcache/types/ingress"
"github.com/boz/kcache/types/job"
"github.com/boz/kcache/types/node"
"github.com/boz/kcache/types/pod"
"github.com/boz/kcache/types/replicaset"
"github.com/boz/kcache/types/replicationcontroller"
"github.com/boz/kcache/types/service"
"github.com/boz/kcache/types/statefulset"
)
type DS interface {
Pods() pod.Controller
Ready() <-chan struct{}
Done() <-chan struct{}
Close()
}
type datastore struct {
podBase pod.Controller
servicesBase service.Controller
nodesBase node.Controller
rcsBase replicationcontroller.Controller
rssBase replicaset.Controller
dssBase daemonset.Controller
deploymentsBase deployment.Controller
statefulsetBase statefulset.Controller
jobsBase job.Controller
ingressesBase ingress.Controller
pods pod.Controller
services service.Controller
nodes node.Controller
rcs replicationcontroller.Controller
rss replicaset.Controller
dss daemonset.Controller
deployments deployment.Controller
statefulsets statefulset.Controller
jobs job.Controller
ingresses ingress.Controller
readych chan struct{}
donech chan struct{}
log logutil.Log
}
type cacheController interface {
Close()
Done() <-chan struct{}
Ready() <-chan struct{}
}
func (ds *datastore) Pods() pod.Controller {
return ds.pods
}
func (ds *datastore) Ready() <-chan struct{} {
return ds.readych
}
func (ds *datastore) Done() <-chan struct{} {
return ds.donech
}
func (ds *datastore) Close() {
ds.closeAll()
}
func (ds *datastore) run(ctx context.Context) {
go func() {
select {
case <-ctx.Done():
ds.Close()
case <-ds.Done():
}
}()
go ds.waitReadyAll()
go ds.waitDoneAll()
}
func (ds *datastore) waitReadyAll() {
for _, c := range ds.controllers() {
select {
case <-c.Done():
return
case <-c.Ready():
}
}
close(ds.readych)
}
func (ds *datastore) closeAll() {
for _, c := range ds.controllers() {
c.Close()
}
}
func (ds *datastore) waitDoneAll() {
defer close(ds.donech)
for _, c := range ds.controllers() {
<-c.Done()
}
}
func (ds *datastore) controllers() []cacheController {
potential := []cacheController{
ds.podBase,
ds.servicesBase,
ds.nodesBase,
ds.rcsBase,
ds.rssBase,
ds.dssBase,
ds.deploymentsBase,
ds.statefulsetBase,
ds.ingressesBase,
ds.pods,
ds.services,
ds.nodes,
ds.rcs,
ds.rss,
ds.dss,
ds.deployments,
ds.statefulsets,
ds.ingresses,
}
var existing []cacheController
for _, c := range potential {
if c != nil {
existing = append(existing, c)
}
}
return existing
}