Skip to content

Commit

Permalink
MEDIUM: watcher: fix race condition & plumbing stop for test
Browse files Browse the repository at this point in the history
  • Loading branch information
gitforbit authored and amelhusic committed May 15, 2020
1 parent f8963ae commit 1093717
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
31 changes: 25 additions & 6 deletions consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ type Watcher struct {
certCAPool *x509.CertPool
leaf *certLeaf

update chan struct{}
log Logger
update chan struct{}
shutdownCh chan struct{}
log Logger
}

// New builds a new watcher
Expand All @@ -70,10 +71,11 @@ func New(service string, consul *api.Client, log Logger) *Watcher {
service: service,
consul: consul,

C: make(chan Config),
upstreams: make(map[string]*upstream),
update: make(chan struct{}, 1),
log: log,
C: make(chan Config),
upstreams: make(map[string]*upstream),
update: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
log: log,
}
}

Expand Down Expand Up @@ -182,9 +184,11 @@ func (w *Watcher) startUpstream(up api.Upstream) {
go func() {
index := uint64(0)
for {
w.lock.Lock()
if u.done {
return
}
w.lock.Unlock()
nodes, meta, err := w.consul.Health().Connect(up.DestinationName, "", true, &api.QueryOptions{
Datacenter: up.Datacenter,
WaitTime: 10 * time.Minute,
Expand Down Expand Up @@ -224,6 +228,7 @@ func (w *Watcher) watchLeaf() {
var lastIndex uint64
first := true
for {
w.notifyShutdownCh()
cert, meta, err := w.consul.Agent().ConnectCALeaf(w.serviceName, &api.QueryOptions{
WaitTime: 10 * time.Minute,
WaitIndex: lastIndex,
Expand Down Expand Up @@ -264,6 +269,7 @@ func (w *Watcher) watchService(service string, handler func(first bool, srv *api
hash := ""
first := true
for {
w.notifyShutdownCh()
srv, meta, err := w.consul.Agent().Service(service, &api.QueryOptions{
WaitHash: hash,
WaitTime: 10 * time.Minute,
Expand Down Expand Up @@ -294,6 +300,7 @@ func (w *Watcher) watchCA() {
first := true
var lastIndex uint64
for {
w.notifyShutdownCh()
caList, meta, err := w.consul.Agent().ConnectCARoots(&api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 10 * time.Minute,
Expand Down Expand Up @@ -416,3 +423,15 @@ func (w *Watcher) notifyChanged() {
default:
}
}

func (w *Watcher) Stop() {
close(w.shutdownCh)
}

func (w *Watcher) notifyShutdownCh() {
select {
case <-w.shutdownCh:
return
default:
}
}
1 change: 1 addition & 0 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func startConnectService(t *testing.T, sd *lib.Shutdown, client *api.Client, reg
errs <- err
}
}()
watcher.Stop()

sourceHap := haproxy.New(client, watcher.C, haproxy.Options{
EnableIntentions: true,
Expand Down

0 comments on commit 1093717

Please sign in to comment.