Skip to content

Commit

Permalink
Added logging and fixed logic to stop jobs after a nomad stop and sta…
Browse files Browse the repository at this point in the history
…rt (#20)

* Added logging and fixed logic to stop jobs after a nomad stop and start

* Improved checking on recover task to include context
  • Loading branch information
Esteban Barrios authored Feb 26, 2020
1 parent b4f5d76 commit fdaa389
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 28 deletions.
77 changes: 56 additions & 21 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,17 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("failed to decode task state from handle: %v", err)
}

/*if err := handle.SetDriverState(&taskState); err != nil {
d.logger.Error("failed to recover task, error setting driver state", "error", err)
}*/
if err := handle.SetDriverState(&taskState); err != nil {
d.logger.Trace("failed to recover task, error setting driver state", "error", err)
}

d.logger.Trace("RECOVER TASK", "taskState", taskState)

var driverConfig TaskConfig
d.logger.Trace("TASKCONFIG RECOVER", "TASKCONFIG RECOVER", taskState.TaskConfig)
/*if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config in RESTORETASK: %v", err)
}*/
if err := taskState.TaskConfig.DecodeDriverConfig(&driverConfig); err != nil {
d.logger.Trace("failed to recover driverConfig, error setting driver state", "error", err)
}

se, err := prepareContainer(handle.Config, driverConfig)
if err != nil {
Expand All @@ -289,18 +289,23 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {

alive := se.checkContainerAlive(handle.Config)
if alive == 0 {
if err := se.startContainer(taskState.TaskConfig); err != nil {
se.destroyContainer(handle.Config)
return fmt.Errorf("unable to start container: %v", err)
}
d.tasks.Delete(handle.Config.ID)
return fmt.Errorf("unable to recover a container that is not running")
} else {
se.containerPid = alive
completeName := handle.Config.JobName + handle.Config.Name + "_" + handle.Config.AllocID
Sout, _ := se.Stdout()
Serr, _ := se.Stderr()
Sout, err := se.Stdout()
if err != nil {
d.logger.Error("Error setting stdout with", "err", err)
}
Serr, err := se.Stderr()
if err != nil {
d.logger.Error("Error setting stderr with", "err", err)
}
directory := handle.Config.TaskDir().SharedTaskDir
se.cmd = &exec.Cmd{
Args: []string{"/usr/local/bin/pot", "start", completeName},
Dir: handle.Config.AllocDir,
Dir: directory,
Path: potBIN,
Process: &os.Process{
Pid: alive,
Expand All @@ -317,6 +322,12 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
exitResult: &drivers.ExitResult{
ExitCode: 0,
Err: nil,
OOMKilled: false,
Signal: 0,
},
}

driverState := TaskState{
Expand All @@ -342,6 +353,11 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
d.logger.Trace("RECOVER TASK", "h", h)
if alive == 0 {
go h.run()
} else {
h.procState = drivers.TaskStateRunning
h.exitResult.ExitCode = h.syexec.exitCode
h.exitResult.Signal = 0
h.completedAt = time.Now()
}

go d.recoverWait(handle.Config.ID, se)
Expand All @@ -357,6 +373,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
d.logger.Trace("###########################################################################################################################################")
d.logger.Trace("########################################################STARTTASK##########################################################################")
d.logger.Trace("###########################################################################################################################################")

if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
Expand All @@ -377,6 +394,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}

se.logger = d.logger
//se.logger.Info("Checking container alive from StartTask")
alive := se.checkContainerAlive(cfg)
if alive == 0 {
d.logger.Trace("StartTask", "Container not alive", alive)
Expand Down Expand Up @@ -464,20 +482,34 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}

func (d *Driver) recoverWait(id string, se syexec) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

OuterLoop:
for {
time.Sleep(1 * time.Second)
code := se.checkContainerAlive(se.cfg)
if code == 0 {
break
select {
case <-d.ctx.Done():
break OuterLoop
case <-ticker.C:
//d.logger.Info("Checking containerAlive from Ticker recoverWait")
code := se.checkContainerAlive(se.cfg)
if code == 0 {
d.logger.Error("Container", "RecoverWait Break", se.cfg.JobName)
break OuterLoop
}
}
}

handle, _ := d.tasks.Get(id)
handle.procState = drivers.TaskStateExited
}

func (d *Driver) potWait(taskID string, se syexec) {
handle, _ := d.tasks.Get(taskID)
se.cmd.Wait()
err := se.cmd.Wait()
if err != nil {
d.logger.Error("Error exiting se.cmd.Wait in potWait", "Err", err)
}
handle.procState = drivers.TaskStateExited

}
Expand All @@ -486,6 +518,7 @@ func (d *Driver) potWait(taskID string, se syexec) {
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
d.logger.Error("WaitTask", "handle", "!ok")
return nil, drivers.ErrTaskNotFound
}

Expand All @@ -509,6 +542,7 @@ func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *dr
return
case <-ticker.C:
s := handle.TaskStatus()
//d.logger.Error("handleWait", "handle", s)
if s.State == drivers.TaskStateExited {
ch <- handle.exitResult
}
Expand All @@ -526,23 +560,24 @@ func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) e
var driverConfig TaskConfig

if err := handle.taskConfig.DecodeDriverConfig(&driverConfig); err != nil {
return fmt.Errorf("failed to decode driver config in STOPTASK: %v", err)
//return fmt.Errorf("failed to decode driver config in STOPTASK: %v", err)
d.logger.Error("unable to decode driver in STOPTASK:", err)
}

se := prepareStop(handle.taskConfig, driverConfig)

se.logger = d.logger

if err := se.stopContainer(handle.taskConfig); err != nil {
se.logger.Error("unable to stop container: %v", err)
se.logger.Error("unable to run stopContainer: %v", err)
}

se = prepareDestroy(handle.taskConfig, driverConfig)

se.logger = d.logger

if err := se.destroyContainer(handle.taskConfig); err != nil {
return fmt.Errorf("unable to destroy container: %v", err)
return fmt.Errorf("unable to run destroyContainer: %v", err)
}

return nil
Expand Down
15 changes: 8 additions & 7 deletions driver/pot.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (s *syexec) destroyContainer(commandCfg *drivers.TaskConfig) error {
}

func (s *syexec) createContainer(commandCfg *drivers.TaskConfig) error {
s.logger.Info("launching createContainer command", "log", strings.Join(s.argvCreate, " "))
s.logger.Debug("launching createContainer command", "log", strings.Join(s.argvCreate, " "))

cmd := exec.Command(potBIN, s.argvCreate...)

Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *syexec) createContainer(commandCfg *drivers.TaskConfig) error {
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.logger.Error("ExitError ", ws.ExitStatus())
s.logger.Error("ExitError copying files", "exitStatus ", ws.ExitStatus())
return errors.New(string(output))
}
s.logger.Error("Could not get exit code for copy command ", "pot", command)
Expand All @@ -252,7 +252,7 @@ func (s *syexec) createContainer(commandCfg *drivers.TaskConfig) error {
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.logger.Error("ExitError ", ws.ExitStatus())
s.logger.Error("ExitError Mounting Files", "exitStatus", ws.ExitStatus())
return errors.New(string(output))
}
s.logger.Error("Could not get exit code for mount command ", "pot", command)
Expand All @@ -271,7 +271,7 @@ func (s *syexec) createContainer(commandCfg *drivers.TaskConfig) error {
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.logger.Error("ExitError ", ws.ExitStatus())
s.logger.Error("ExitError Mounting r-only files", "exitStatus", ws.ExitStatus())
return errors.New(string(output))
}
s.logger.Error("Could not get exit code for mounting read only container ", "pot", command)
Expand Down Expand Up @@ -369,13 +369,14 @@ func (s *syexec) checkContainerAlive(commandCfg *drivers.TaskConfig) int {
s.logger.Trace("Allocation name beeing check for liveness", "alive", potName)

psCommand := "/bin/sh /usr/local/bin/pot start " + potName
pidCommand := "pgrep -f '" + psCommand + "'"
pidCommand := "/bin/pgrep -f '" + psCommand + "'"
s.logger.Trace("Command to execute", "alive", pidCommand)
output, err := exec.Command("sh", "-c", pidCommand).Output()
s.logger.Trace("Got output", "output:", string(output), "err: ", err)
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.logger.Error("ExitError ", ws.ExitStatus())
s.logger.Error("ExitError checkContainerAlive", "exitStatus", ws.ExitStatus())
return 0
}
s.logger.Error("Could not get exit code for ps command ", "pot", err)
Expand Down Expand Up @@ -405,7 +406,7 @@ func (s *syexec) checkContainerExists(commandCfg *drivers.TaskConfig) int {
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
ws := exitError.Sys().(syscall.WaitStatus)
s.logger.Error("ExitError ", ws.ExitStatus())
s.logger.Error("ExitError CheckContainerExists", "exitError", ws.ExitStatus())
return 0
}
s.logger.Error("Could not get exit code for ps command ", "pot", err)
Expand Down

0 comments on commit fdaa389

Please sign in to comment.