Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(k8s): Make RunContainer detect/report more error conditions like image pull errors #331

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2929949
feat(k8s): add eventInformer to podTracker
cognifloyd Apr 28, 2022
be144f4
feat(k8s): ignore event deletion
cognifloyd Apr 28, 2022
7602600
feat(k8s): begin handling event stream
cognifloyd Apr 28, 2022
457a664
refactor: create eventInformer from eventInformerFactory
cognifloyd Apr 29, 2022
7d674e8
refactor: rename selector=>labelSelector
cognifloyd Apr 29, 2022
71c9b34
enhance: register eventInformerFactory on podTracker
cognifloyd Apr 29, 2022
c3e4b50
enhance: add podTracker.inspectContainerEvent
cognifloyd Apr 29, 2022
3e89df6
enhance: add signal for running container
cognifloyd Apr 29, 2022
8ba7ad5
enhance: only mark containers as running/terminated if it is the corr…
cognifloyd Apr 29, 2022
038bf97
enhance(k8s): exit WaitContainer if build is canceled
cognifloyd Apr 29, 2022
b5ab8eb
enhance(k8s): add containerTracker.Events() function
cognifloyd Apr 29, 2022
57011c1
tests: fix inspectContainerStatuses test
cognifloyd Apr 29, 2022
639b219
bugfix(k8s): Make sure image pull errors are detected
cognifloyd Apr 29, 2022
092840b
refactor(k8s): use channels to signal imagePull success/errors
cognifloyd Apr 29, 2022
e3d6d93
fix: comment typos
cognifloyd Apr 29, 2022
aa1078e
enhance: capture ImagePull errors from ContainerStatuses as well
cognifloyd Apr 29, 2022
9c64abd
enhance(k8s): handle more image pull event types
cognifloyd Apr 29, 2022
9686c23
tests(k8s): fix tests for RunContainer
cognifloyd May 3, 2022
dd189e0
tests(k8s): test RunContainer and WaitContainer with canceled build
cognifloyd May 3, 2022
ca14e32
tests(k8s): test AssembleBuild with canceled build
cognifloyd May 4, 2022
2a6d109
tests(k8s): test RunContainer with PodTracker failure (increase cover…
cognifloyd May 4, 2022
08c219c
tests(k8s): test inspectContainerStatuses with Running or ImagePullError
cognifloyd May 4, 2022
40da3f8
chore: prune some comments
cognifloyd May 4, 2022
4726072
tests: fix inspectContainerStatuses test with an errgroup
cognifloyd May 4, 2022
0d2f315
tests: test RunContainer with an ImagePullError
cognifloyd May 4, 2022
ccf9fb2
tests: test getTrackedPodEvent
cognifloyd May 4, 2022
f76339d
tests: test podTracker.HandleEventAdd, podTracker.HandleEventUpdate
cognifloyd May 4, 2022
e6876d1
refactor: drop unused Events function
cognifloyd May 4, 2022
ecceeac
tests: test inspectContainerEvent image pull events
cognifloyd May 4, 2022
b945f33
tests: test inspectContainerEvent edge cases
cognifloyd May 4, 2022
9ec81fa
tests: test more pull policies in SetupContainer
cognifloyd May 4, 2022
65cbe59
chore: delete dead code.
cognifloyd May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
close(c.PodTracker.Ready)

// wait for the PodTracker caches to populate before creating the pipeline pod.
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced, c.PodTracker.EventSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

Expand Down
40 changes: 33 additions & 7 deletions runtime/kubernetes/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,10 @@ func TestKubernetes_StreamBuild(t *testing.T) {
func TestKubernetes_AssembleBuild(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
pipeline *pipeline.Build
name string
failure bool
cancelBuild bool
pipeline *pipeline.Build
// k8sPod is the pod that the mock Kubernetes client will return
k8sPod *v1.Pod
// enginePod is the pod under construction in the Runtime Engine
Expand Down Expand Up @@ -491,6 +492,22 @@ func TestKubernetes_AssembleBuild(t *testing.T) {
k8sPod: _pod,
enginePod: _pod,
},
{
name: "stages-build canceled",
failure: true,
cancelBuild: true,
pipeline: _stages,
k8sPod: &v1.Pod{},
enginePod: _stagesPod,
},
{
name: "steps-build canceled",
failure: true,
cancelBuild: true,
pipeline: _steps,
k8sPod: &v1.Pod{},
enginePod: _pod,
},
}

// run tests
Expand All @@ -508,16 +525,25 @@ func TestKubernetes_AssembleBuild(t *testing.T) {
_engine.containersLookup[ctn.Name] = i
}

// setup test context
ctx, done := context.WithCancel(context.Background())
defer done()

// StreamBuild and AssembleBuild coordinate their work, so, emulate
// executor.StreamBuild which calls runtime.StreamBuild concurrently.
go func() {
err := _engine.StreamBuild(context.Background(), test.pipeline)
if err != nil {
t.Errorf("unable to start PodTracker via StreamBuild")
if test.cancelBuild {
// simulate a build timeout
done()
} else {
err := _engine.StreamBuild(context.Background(), test.pipeline)
if err != nil {
t.Errorf("unable to start PodTracker via StreamBuild")
}
}
}()

err = _engine.AssembleBuild(context.Background(), test.pipeline)
err = _engine.AssembleBuild(ctx, test.pipeline)

if test.failure {
if err == nil {
Expand Down
173 changes: 171 additions & 2 deletions runtime/kubernetes/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,38 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

// These are known kubernetes event Reasons.
const (
// nolint: godot // commented code is not a sentence
// known scheduler event reasons can be found here:
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/scheduler/schedule_one.go
//reasonFailedScheduling = "FailedScheduling"
//reasonScheduled = "Scheduled"

// known kubelet event reasons are listed here:
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/events/event.go

// kubelet image event reasons.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to just use their constants or wrap them at least so we know when they change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They have not packaged the code so that it can be used externally.
https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go

If I do go get github.com/kubernetes/kubernetes I get an error module declares its path as: k8s.io/kubernetes
If I do go get k8s.io/kubernetes I get an error that k8s.io/[email protected] requires k8s.io/[email protected]: reading k8s.io/api/go.mod at revision v0.0.0: unknown revision v0.0.0

They seem to carefully curate what can be imported by external projects, so we can't import this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, just checking. I just noticed when I went to the code they were public on the package. So, thought they might be importable.

Copy link
Contributor

@jbrockopp jbrockopp May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought I'd provide a bit of clarity on this subject...

The tl;dr is you can vendor from k8s.io/kubernetes but its not pretty or straightforward 😅

Here's the reason why go get k8s.io/kubernetes fails:

kubernetes/kubernetes#79384 (comment)

And to actually vendor from k8s.io/kubernetes, you have to add a replace directive for all nested packages:

kubernetes/kubernetes#79384 (comment)

If we want, there are people who've scripted the approach to this so updating the version is easier:

kubernetes/kubernetes#79384 (comment)

To see a real world example of what this looks like in the go.mod file:

https://github.com/kubernetes-sigs/cloud-provider-huaweicloud/blob/9589bc854c29e399476479f9c5aa9599b2064da5/go.mod#L18-L40

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eww... All that work just to reuse some constants... Not worth it imo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed 😄

reasonPulling = "Pulling"
reasonPulled = "Pulled"
reasonFailed = "Failed" // Warning: image, container, pod
reasonInspectFailed = "InspectFailed" // Warning
reasonErrImageNeverPull = "ErrImageNeverPull" // Warning
reasonBackOff = "BackOff" // Normal: image, container

// nolint: godot // commented code is not a sentence
// kubelet container event reasons.
//reasonCreated = "Created"
//reasonStarted = "Started"
//reasonKilling = "Killing"
//reasonPreempting = "Preempting"
//reasonExceededGracePeriod = "ExceededGracePeriod"
// kubelet pod event reasons.
//reasonFailedKillPod = "FailedKillPod"
//reasonFailedCreatePodContainer = "FailedCreatePodContainer"
//reasonNetworkNotReady = "NetworkNotReady"
)

// InspectContainer inspects the pipeline container.
func (c *client) InspectContainer(ctx context.Context, ctn *pipeline.Container) error {
c.Logger.Tracef("inspecting container %s", ctn.ID)
Expand Down Expand Up @@ -81,6 +113,12 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p
return err
}

// get the containerTracker for this container
ctnTracker, ok := c.PodTracker.Containers[ctn.ID]
if !ok {
return fmt.Errorf("containerTracker missing for %s", ctn.ID)
}

// set the pod container image to the parsed step image
c.Pod.Spec.Containers[c.containersLookup[ctn.ID]].Image = _image

Expand All @@ -98,7 +136,25 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p
return err
}

return nil
// make sure the container starts (watch for image pull errors or similar)
for {
select {
case <-ctx.Done():
// build was canceled. give up
return nil
case <-ctnTracker.Running:
// hooray it is running
return nil
case event := <-ctnTracker.ImagePullErrors:
return fmt.Errorf(
"failed to run container %s in %s: [%s] %s",
ctn.ID,
c.Pod.ObjectMeta.Name,
event.Reason,
event.Message,
)
}
}
}

// SetupContainer prepares the image for the pipeline container.
Expand Down Expand Up @@ -322,7 +378,14 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err
}

// wait for the container terminated signal
<-tracker.Terminated
select {
case <-tracker.Terminated:
// container is terminated
break
case <-ctx.Done():
// build was canceled
break
}

return nil
}
Expand Down Expand Up @@ -354,6 +417,12 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) {
// cst.LastTerminationState has details about the kubernetes/pause image's exit.
// cst.RestartCount is 1 at exit due to switch from kubernetes/pause to final image.

if cst.Image != tracker.Image {
// we don't care if the pause image has terminated or is running
p.Logger.Tracef("container %s expected image %s, got %s", cst.Name, tracker.Image, cst.Image)
continue
}

// check if the container is in a terminated state
//
// https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState
Expand All @@ -364,6 +433,106 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) {
// let WaitContainer know the container is terminated
close(tracker.Terminated)
})
} else if cst.State.Running != nil {
tracker.runningOnce.Do(func() {
p.Logger.Debugf("container running: %s in pod %s, %v", cst.Name, p.TrackedPod, cst)

// let RunContainer know the container is running
close(tracker.Running)
})
} else if cst.State.Waiting != nil &&
(cst.State.Waiting.Reason == reasonBackOff || cst.State.Waiting.Reason == reasonFailed) &&
strings.Contains(cst.State.Waiting.Message, tracker.Image) {
// inspectContainerStatuses should return as quickly as possible
// writing to the channel can block when nothing is receiving,
// so fire it off in a goroutine.
go func() {
// imitate an event to make sure we catch it.
tracker.ImagePullErrors <- &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
},
InvolvedObject: v1.ObjectReference{
Kind: "Pod",
Name: p.Name,
Namespace: p.Namespace,
FieldPath: fmt.Sprintf("spec.containers{%s}", tracker.Name),
},
Reason: cst.State.Waiting.Reason,
Message: cst.State.Waiting.Message,
}
}()
}
}
}

// inspectContainerEvent gathers container info from an event.
func (p *podTracker) inspectContainerEvent(event *v1.Event) {
if !strings.HasPrefix(event.InvolvedObject.FieldPath, "spec.containers") {
// event is not for a container
return
}

// the FieldPath format is "spec.containers{container-name}" for named containers
containerName := strings.TrimPrefix(event.InvolvedObject.FieldPath, "spec.containers{")
containerName = strings.TrimSuffix(containerName, "}")

if containerName == event.InvolvedObject.FieldPath {
// the FieldPath is probably the indexed "spec.containers[2]" format,
// which is only used for unnamed containers,
// but all of our containers are named.
p.Logger.Debugf("ignoring unnamed container, got pod fieldPath %s", event.InvolvedObject.FieldPath)

return
}

// get the containerTracker for this container
tracker, ok := p.Containers[containerName]
if !ok {
// unknown container (probably a sidecar injected by an admissions controller)
p.Logger.Debugf("ignoring untracked container %s from pod %s", containerName, p.TrackedPod)

return
}

p.Logger.Tracef("container event for %s: [%s] %s", tracker.Name, event.Reason, event.Message)

// check if the event mentions the target image.
// If the relevant messages does not include our image, then
// either it is for "kubernetes/pause:latest", which we don't care about,
// or it is a generic message that is basically useless like:
// event.Reason => event.Message
// Failed => Error: ErrImagePull
// BackOff => Error: ImagePullBackOff
// Many of these generic messages come from this part of kubelet:
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/kuberuntime/kuberuntime_container.go
if strings.Contains(event.Message, tracker.Image) {
switch event.Reason {
// examples: event.Reason => event.Message
// The image related messages come from the image manager in kubelet:
// https://github.com/kubernetes/kubernetes/blob/v1.23.6/pkg/kubelet/images/image_manager.go
case reasonFailed, reasonBackOff, reasonInspectFailed, reasonErrImageNeverPull:
// Failed => Failed to pull image "image:tag": <containerd message>
// BackOff => Back-off pulling image "image:tag"
// InspectFailed => Failed to apply default image tag "<image>": couldn't parse image reference "<image>": <docker error>
// InspectFailed => Failed to inspect image "<image>": <docker error>
// ErrImageNeverPull => Container image "image:tag" is not present with pull policy of Never
tracker.ImagePullErrors <- event
return
case reasonPulled:
// Pulled => Successfully pulled image "image:tag" in <time>
// Pulled => Container image "image:tag" already present on machine
tracker.imagePulledOnce.Do(func() {
p.Logger.Debugf("container image pulled: %s in pod %s, %v", tracker.Name, p.TrackedPod, event.Message)

// let RunContainer know the container image was pulled
close(tracker.ImagePulled)
})
case reasonPulling:
// Pulling => Pulling image "image:tag"
return
default:
return
}
}
}
Loading