Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #797 from Random-Liu/cherrypick-#794
Browse files Browse the repository at this point in the history
Generate fatal error when cri plugin fail to start.
  • Loading branch information
Random-Liu authored May 31, 2018
2 parents 4b0acec + 5b39aca commit 8845f14
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 22 deletions.
20 changes: 12 additions & 8 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
}

// start starts the event monitor which monitors and handles all container events. It returns
// a channel for the caller to wait for the event monitor to stop. start must be called after
// subscribe.
func (em *eventMonitor) start() (<-chan struct{}, error) {
// an error channel for the caller to wait for stop errors from the event monitor.
// start must be called after subscribe.
func (em *eventMonitor) start() <-chan error {
errCh := make(chan error)
if em.ch == nil || em.errCh == nil {
return nil, errors.New("event channel is nil")
panic("event channel is nil")
}
closeCh := make(chan struct{})
backOffCheckCh := em.backOff.start()
go func() {
defer close(errCh)
for {
select {
case e := <-em.ch:
Expand All @@ -144,8 +145,11 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
em.backOff.enBackOff(cID, evt)
}
case err := <-em.errCh:
logrus.WithError(err).Error("Failed to handle event stream")
close(closeCh)
// Close errCh in defer directly if there is no error.
if err != nil {
logrus.WithError(err).Errorf("Failed to handle event stream")
errCh <- err
}
return
case <-backOffCheckCh:
cIDs := em.backOff.getExpiredContainers()
Expand All @@ -162,7 +166,7 @@ func (em *eventMonitor) start() (<-chan struct{}, error) {
}
}
}()
return closeCh, nil
return errCh
}

// stop stops the event monitor. It will close the event channel.
Expand Down
40 changes: 26 additions & 14 deletions pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package server
import (
"fmt"
"io"
"net/http"
"path/filepath"
"time"

Expand Down Expand Up @@ -179,10 +180,7 @@ func (c *criService) Run() error {

// Start event handler.
logrus.Info("Start event monitor")
eventMonitorCloseCh, err := c.eventMonitor.start()
if err != nil {
return errors.Wrap(err, "failed to start event monitor")
}
eventMonitorErrCh := c.eventMonitor.start()

// Start snapshot stats syncer, it doesn't need to be stopped.
logrus.Info("Start snapshots syncer")
Expand All @@ -195,27 +193,32 @@ func (c *criService) Run() error {

// Start streaming server.
logrus.Info("Start streaming server")
streamServerCloseCh := make(chan struct{})
streamServerErrCh := make(chan error)
go func() {
if err := c.streamServer.Start(true); err != nil {
defer close(streamServerErrCh)
if err := c.streamServer.Start(true); err != nil && err != http.ErrServerClosed {
logrus.WithError(err).Error("Failed to start streaming server")
streamServerErrCh <- err
}
close(streamServerCloseCh)
}()

// Set the server as initialized. GRPC services could start serving traffic.
c.initialized.Set()

var eventMonitorErr, streamServerErr error
// Stop the whole CRI service if any of the critical service exits.
select {
case <-eventMonitorCloseCh:
case <-streamServerCloseCh:
case eventMonitorErr = <-eventMonitorErrCh:
case streamServerErr = <-streamServerErrCh:
}
if err := c.Close(); err != nil {
return errors.Wrap(err, "failed to stop cri service")
}

<-eventMonitorCloseCh
// If the error is set above, err from channel must be nil here, because
// the channel is supposed to be closed. Or else, we wait and set it.
if err := <-eventMonitorErrCh; err != nil {
eventMonitorErr = err
}
logrus.Info("Event monitor stopped")
// There is a race condition with http.Server.Serve.
// When `Close` is called at the same time with `Serve`, `Close`
Expand All @@ -227,18 +230,27 @@ func (c *criService) Run() error {
// is fixed.
const streamServerStopTimeout = 2 * time.Second
select {
case <-streamServerCloseCh:
case err := <-streamServerErrCh:
if err != nil {
streamServerErr = err
}
logrus.Info("Stream server stopped")
case <-time.After(streamServerStopTimeout):
logrus.Errorf("Stream server is not stopped in %q", streamServerStopTimeout)
}
if eventMonitorErr != nil {
return errors.Wrap(eventMonitorErr, "event monitor error")
}
if streamServerErr != nil {
return errors.Wrap(streamServerErr, "stream server error")
}
return nil
}

// Stop stops the CRI service.
// Close stops the CRI service.
// TODO(random-liu): Make close synchronous.
func (c *criService) Close() error {
logrus.Info("Stop CRI service")
// TODO(random-liu): Make event monitor stop synchronous.
c.eventMonitor.stop()
if err := c.streamServer.Stop(); err != nil {
return errors.Wrap(err, "failed to stop stream server")
Expand Down

0 comments on commit 8845f14

Please sign in to comment.