diff --git a/CHANGELOG.md b/CHANGELOG.md index c854a559..4a02e4b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - cleaned up shutdown logic for outbound sender [#205](https://github.com/xmidt-org/caduceus/pull/205) - added resetting queue depth and current workers gauges to outbound sender [#205](https://github.com/xmidt-org/caduceus/pull/205) - removed queueEmpty variable from outbound sender [#209](https://github.com/xmidt-org/caduceus/pull/209) +- fixed outbound sender's long running dispatcher() goroutine to not exit when a cutoff occurs [#210](https://github.com/xmidt-org/caduceus/pull/210) ## [v0.2.7] - pared down logging, especially debugging logs [#196](https://github.com/xmidt-org/caduceus/pull/196) diff --git a/outboundSender.go b/outboundSender.go index 2062fa3d..fb08a23d 100644 --- a/outboundSender.go +++ b/outboundSender.go @@ -440,6 +440,10 @@ func (obs *CaduceusOutboundSender) isValidTimeWindow(now, dropUntil, deliverUnti return true } +// Empty is called on cutoff or shutdown and swaps out the current queue for +// a fresh one, counting any current messages in the queue as dropped. +// It should never close a queue, as a queue not referenced anywhere will be +// cleaned up by the garbage collector without needing to be closed. func (obs *CaduceusOutboundSender) Empty(droppedCounter metrics.Counter) { droppedMsgs := obs.queue.Load().(chan *wrp.Message) obs.queue.Store(make(chan *wrp.Message, obs.queueSize)) @@ -459,9 +463,31 @@ func (obs *CaduceusOutboundSender) dispatcher() { Loop: for { + // Always pull a new queue in case we have been cutoff or are shutting + // down. msgQueue := obs.queue.Load().(chan *wrp.Message) select { + // The dispatcher cannot get stuck blocking here forever (caused by an + // empty queue that is replaced and then Queue() starts adding to the + // new queue) because: + // - queue is only replaced on cutoff and shutdown + // - on cutoff, the first queue is always full so we will definitely + // get a message, drop it because we're cut off, then get the new + // queue and block until the cut off ends and Queue() starts queueing + // messages again. + // - on graceful shutdown, the queue is closed and then the dispatcher + // will send all messages, then break the loop, gather workers, and + // exit. + // - on non graceful shutdown, the queue is closed and then replaced + // with a new, empty queue that is also closed. + // - If the first queue is empty, we immediately break the loop, + // gather workers, and exit. + // - If the first queue has messages, we drop a message as expired + // pull in the new queue which is empty and closed, break the + // loop, gather workers, and exit. case msg, ok = <-msgQueue: + // This is only true when a queue is empty and closed, which for us + // only happens on Shutdown(). if !ok { break Loop } @@ -469,6 +495,8 @@ Loop: obs.mutex.RLock() urls = obs.urls // Move to the next URL to try 1st the next time. + // This is okay because we run a single dispatcher and it's the + // only one updating this field. obs.urls = obs.urls.Next() deliverUntil := obs.deliverUntil dropUntil := obs.dropUntil @@ -480,7 +508,7 @@ Loop: if now.Before(dropUntil) { obs.droppedCutoffCounter.Add(1.0) - break Loop + continue } if now.After(deliverUntil) { obs.droppedExpiredCounter.Add(1.0) @@ -609,7 +637,9 @@ func (obs *CaduceusOutboundSender) send(urls *ring.Ring, secret, acceptType stri obs.deliveryCounter.With("url", obs.id, "code", code, "event", event).Add(1.0) } -// queueOverflow handles the logic of what to do when a queue overflows +// queueOverflow handles the logic of what to do when a queue overflows: +// cutting off the webhook for a time and sending a cut off notification +// to the failure URL. func (obs *CaduceusOutboundSender) queueOverflow() { obs.mutex.Lock() if time.Now().Before(obs.dropUntil) { @@ -629,6 +659,8 @@ func (obs *CaduceusOutboundSender) queueOverflow() { obs.cutOffCounter.Add(1.0) + // We empty the queue but don't close the channel, because we're not + // shutting down. obs.Empty(obs.droppedCutoffCounter) msg, err := json.Marshal(failureMsg)