generated from dogmatiq/template-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
171 lines (142 loc) · 4.58 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package discoverkit
import (
"sync"
"github.com/dogmatiq/configkit"
"github.com/dogmatiq/interopspec/discoverspec"
)
const (
// DefaultGRPCPort is the default TCP port used by servers that implement
// the APIs in interopspec, in particular the DiscoverAPI.
DefaultGRPCPort = "50555"
)
// Server is an implementation of discoverspec.DiscoverAPIServer.
type Server struct {
m sync.Mutex
// available is the set of applications that are currently available indexed
// by their identity key.
//
// The map value itself is treated as though it is immutable. When a change
// to the available applications is made the map is cloned and the changes
// applied to the clone. Finally, the available field is updated to refer
// to the clone.
//
// This allows many goroutines to read from any given "version" of the map
// without holding any locks.
available map[string]*discoverspec.Identity
// changed is a "broadcast" channel that is closed to signal that the set of
// available applications has been replaced with a new "version".
changed chan struct{}
}
var _ discoverspec.DiscoverAPIServer = (*Server)(nil)
// Available marks the given application as available.
func (s *Server) Available(id configkit.Identity) {
s.update(id, true)
}
// Unavailable marks the given application as unavailable.
func (s *Server) Unavailable(id configkit.Identity) {
s.update(id, false)
}
// update the availability of the given app.
func (s *Server) update(id configkit.Identity, available bool) {
if err := id.Validate(); err != nil {
panic(err)
}
s.m.Lock()
defer s.m.Unlock()
if _, ok := s.available[id.Key]; ok == available {
// The desired availability is the same as the app's current
// availability, do nothing.
return
}
// Create a clone of s.available. This avoids any data races with other
// goroutines reading the map currently referenced by s.available.
next := make(map[string]*discoverspec.Identity, len(s.available))
// Add the newly available application.
if available {
next[id.Key] = &discoverspec.Identity{
Name: id.Name,
Key: id.Key,
}
}
// Copy the existing applications excluding the current app if it has become
// unavailable.
for k, v := range s.available {
if available || k != id.Key {
next[k] = v
}
}
// Replace s.available with the clone.
s.available = next
// Notify the watchers that a change has been made.
if s.changed != nil {
close(s.changed)
s.changed = nil
}
}
// snapshot returns the current set of available applications, and a channel that
// is closed if the set of available applications changes.
func (s *Server) snapshot() (map[string]*discoverspec.Identity, <-chan struct{}) {
s.m.Lock()
defer s.m.Unlock()
if s.changed == nil {
s.changed = make(chan struct{})
}
return s.available, s.changed
}
// WatchApplications starts watching the server for updates to the availability
// of Dogma applications.
func (s *Server) WatchApplications(
_ *discoverspec.WatchApplicationsRequest,
stream discoverspec.DiscoverAPI_WatchApplicationsServer,
) error {
// Keep a reference to the previous map of available applications. This is
// used to compute a "diff" when the available applications is updated.
//
// This approach is taken as it allows changes to the available applications
// to be applied in s.Available() and s.Unavailable() without waiting for
// each individual WatchApplications() consumer to receive its updates.
var prev map[string]*discoverspec.Identity
for {
// Read the current list of available applications.
next, changed := s.snapshot()
// Send an "available" response for each application that is in "next",
// but not in "prev".
if err := s.diff(stream, true, next, prev); err != nil {
return err
}
// Send an "unavailable" response for each application that is in
// "prev", but not in "next".
if err := s.diff(stream, false, prev, next); err != nil {
return err
}
select {
case <-stream.Context().Done():
// The client has disconnected, or the server has been stopped.
return stream.Context().Err()
case <-changed:
// The list of available applications has changed.
prev = next
}
}
}
// diff sends a WatchResponse for each application that is present in lhs but
// not present in rhs.
func (s *Server) diff(
stream discoverspec.DiscoverAPI_WatchApplicationsServer,
available bool,
lhs, rhs map[string]*discoverspec.Identity,
) error {
for k, id := range lhs {
if _, ok := rhs[k]; ok {
continue
}
res := &discoverspec.WatchApplicationsResponse{
Identity: id,
Available: available,
}
if err := stream.Send(res); err != nil {
return err
}
}
return nil
}