diff --git a/pkg/server/events.go b/pkg/server/events.go index 6837acf0a..9bf11a791 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) { } // start starts the event monitor which monitors and handles all container events. It returns -// a channel for the caller to wait for the event monitor to stop. start must be called after -// subscribe. -func (em *eventMonitor) start() (<-chan struct{}, error) { +// an error channel for the caller to wait for stop errors from the event monitor. +// start must be called after subscribe. +func (em *eventMonitor) start() <-chan error { + errCh := make(chan error) if em.ch == nil || em.errCh == nil { - return nil, errors.New("event channel is nil") + panic("event channel is nil") } - closeCh := make(chan struct{}) backOffCheckCh := em.backOff.start() go func() { + defer close(errCh) for { select { case e := <-em.ch: @@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { em.backOff.enBackOff(cID, evt) } case err := <-em.errCh: - logrus.WithError(err).Error("Failed to handle event stream") - close(closeCh) + // Close errCh in defer directly if there is no error. + if err != nil { + logrus.WithError(err).Errorf("Failed to handle event stream") + errCh <- err + } return case <-backOffCheckCh: cIDs := em.backOff.getExpiredContainers() @@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) { } } }() - return closeCh, nil + return errCh } // stop stops the event monitor. It will close the event channel. diff --git a/pkg/server/service.go b/pkg/server/service.go index 7fdeb61de..86ec5bb73 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -19,6 +19,7 @@ package server import ( "fmt" "io" + "net/http" "path/filepath" "time" @@ -179,10 +180,7 @@ func (c *criService) Run() error { // Start event handler. logrus.Info("Start event monitor") - eventMonitorCloseCh, err := c.eventMonitor.start() - if err != nil { - return errors.Wrap(err, "failed to start event monitor") - } + eventMonitorErrCh := c.eventMonitor.start() // Start snapshot stats syncer, it doesn't need to be stopped. logrus.Info("Start snapshots syncer") @@ -195,27 +193,32 @@ func (c *criService) Run() error { // Start streaming server. logrus.Info("Start streaming server") - streamServerCloseCh := make(chan struct{}) + streamServerErrCh := make(chan error) go func() { - if err := c.streamServer.Start(true); err != nil { + defer close(streamServerErrCh) + if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed { logrus.WithError(err).Error("Failed to start streaming server") + streamServerErrCh <- err } - close(streamServerCloseCh) }() // Set the server as initialized. GRPC services could start serving traffic. c.initialized.Set() + var eventMonitorErr, streamServerErr error // Stop the whole CRI service if any of the critical service exits. select { - case <-eventMonitorCloseCh: - case <-streamServerCloseCh: + case eventMonitorErr = <-eventMonitorErrCh: + case streamServerErr = <-streamServerErrCh: } if err := c.Close(); err != nil { return errors.Wrap(err, "failed to stop cri service") } - - <-eventMonitorCloseCh + // If the error is set above, err from channel must be nil here, because + // the channel is supposed to be closed. Or else, we wait and set it. + if err := <-eventMonitorErrCh; err != nil { + eventMonitorErr = err + } logrus.Info("Event monitor stopped") // There is a race condition with http.Server.Serve. // When `Close` is called at the same time with `Serve`, `Close` @@ -227,18 +230,27 @@ func (c *criService) Run() error { // is fixed. const streamServerStopTimeout = 2 * time.Second select { - case <-streamServerCloseCh: + case err := <-streamServerErrCh: + if err != nil { + streamServerErr = err + } logrus.Info("Stream server stopped") case <-time.After(streamServerStopTimeout): logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout) } + if eventMonitorErr != nil { + return errors.Wrap(eventMonitorErr, "event monitor error") + } + if streamServerErr != nil { + return errors.Wrap(streamServerErr, "stream server error") + } return nil } -// Stop stops the CRI service. +// Close stops the CRI service. +// TODO(random-liu): Make close synchronous. func (c *criService) Close() error { logrus.Info("Stop CRI service") - // TODO(random-liu): Make event monitor stop synchronous. c.eventMonitor.stop() if err := c.streamServer.Stop(); err != nil { return errors.Wrap(err, "failed to stop stream server")