From 29299490a5f48420d74597ddb60b50bd1b172198 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 17:28:01 -0500 Subject: [PATCH 01/32] feat(k8s): add eventInformer to podTracker --- runtime/kubernetes/build.go | 2 +- runtime/kubernetes/pod_tracker.go | 33 +++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/runtime/kubernetes/build.go b/runtime/kubernetes/build.go index ad94681f..78e436cf 100644 --- a/runtime/kubernetes/build.go +++ b/runtime/kubernetes/build.go @@ -219,7 +219,7 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error { close(c.PodTracker.Ready) // wait for the PodTracker caches to populate before creating the pipeline pod. - if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok { + if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced, c.PodTracker.EventSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 221e881f..5e6b15f0 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -49,6 +49,8 @@ type podTracker struct { informerDone context.CancelFunc // podInformer watches the given pod, caches the results, and makes them available in podLister podInformer informers.PodInformer + // eventInformer watches events for the given pod, caches the results, and makes them available in eventLister + eventInformer informers.EventInformer // PodLister helps list Pods. All objects returned here must be treated as read-only. PodLister listers.PodLister @@ -56,6 +58,12 @@ type podTracker struct { // This is useful for determining if caches have synced. PodSynced cache.InformerSynced + // EventLister helps list Events. All objects returned here must be treated as read-only. + EventLister listers.EventLister + // EventSynced is a function that can be used to determine if an informer has synced. + // This is useful for determining if caches have synced. + EventSynced cache.InformerSynced + // Containers maps the container name to a containerTracker Containers map[string]*containerTracker @@ -143,6 +151,21 @@ func (p *podTracker) getTrackedPod(obj interface{}) *v1.Pod { return pod } +// HandleEventAdd is an AddFunc for cache.ResourceEventHandlerFuncs for Events. +func (p *podTracker) HandleEventAdd(newObj interface{}) { + // TODO: do something with the (possible) event +} + +// HandleEventUpdate is an UpdateFunc for cache.ResourceEventHandlerFuncs for Events. +func (p *podTracker) HandleEventUpdate(oldObj, newObj interface{}) { + // TODO: do something with the (possible) event(s) +} + +// HandleEventDelete is an DeleteFunc for cache.ResourceEventHandlerFuncs for Events. +func (p *podTracker) HandleEventDelete(oldObj interface{}) { + // TODO: do something with the (possible) event +} + // Start kicks off the API calls to start populating the cache. // There is no need to run this in a separate goroutine (ie go podTracker.Start(ctx)). func (p *podTracker) Start(ctx context.Context) { @@ -213,6 +236,7 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po }), ) podInformer := informerFactory.Core().V1().Pods() + eventInformer := informerFactory.Core().V1().Events() // initialize podTracker tracker := podTracker{ @@ -222,6 +246,9 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po podInformer: podInformer, PodLister: podInformer.Lister(), PodSynced: podInformer.Informer().HasSynced, + eventInformer: eventInformer, + EventLister: eventInformer.Lister(), + EventSynced: eventInformer.Informer().HasSynced, Ready: make(chan struct{}), } @@ -232,6 +259,12 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po DeleteFunc: tracker.HandlePodDelete, }) + eventInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: tracker.HandleEventAdd, + UpdateFunc: tracker.HandleEventUpdate, + DeleteFunc: tracker.HandleEventDelete, + }) + return &tracker, nil } From be144f4ecb84c9686b93734daa344ab684324ab0 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 17:41:59 -0500 Subject: [PATCH 02/32] feat(k8s): ignore event deletion We have nothing to do when that happens. --- runtime/kubernetes/pod_tracker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 5e6b15f0..ce12999f 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -162,9 +162,8 @@ func (p *podTracker) HandleEventUpdate(oldObj, newObj interface{}) { } // HandleEventDelete is an DeleteFunc for cache.ResourceEventHandlerFuncs for Events. -func (p *podTracker) HandleEventDelete(oldObj interface{}) { - // TODO: do something with the (possible) event -} +//func (p *podTracker) HandleEventDelete(oldObj interface{}) { +//} // Start kicks off the API calls to start populating the cache. // There is no need to run this in a separate goroutine (ie go podTracker.Start(ctx)). @@ -262,7 +261,8 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po eventInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: tracker.HandleEventAdd, UpdateFunc: tracker.HandleEventUpdate, - DeleteFunc: tracker.HandleEventDelete, + // events get deleted after some time, which we ignore. + //DeleteFunc: tracker.HandleEventDelete, }) return &tracker, nil From 7602600df0789396001806b8f019e8f5839ddf54 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 18:16:03 -0500 Subject: [PATCH 03/32] feat(k8s): begin handling event stream --- runtime/kubernetes/pod_tracker.go | 55 +++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index ce12999f..b7cc70b1 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -154,17 +154,72 @@ func (p *podTracker) getTrackedPod(obj interface{}) *v1.Pod { // HandleEventAdd is an AddFunc for cache.ResourceEventHandlerFuncs for Events. func (p *podTracker) HandleEventAdd(newObj interface{}) { // TODO: do something with the (possible) event + newEvent := p.getTrackedPodEvent(newObj) + if newEvent == nil { + // not valid or not for our tracked pod + return + } + + //if event.InvolvedObject.FieldPath == "spec.container{%s}" { + // + //} + + p.Logger.Tracef("handling event add event for %s: %v", p.TrackedPod, newEvent) } // HandleEventUpdate is an UpdateFunc for cache.ResourceEventHandlerFuncs for Events. func (p *podTracker) HandleEventUpdate(oldObj, newObj interface{}) { // TODO: do something with the (possible) event(s) + oldEvent := p.getTrackedPodEvent(oldObj) + newEvent := p.getTrackedPodEvent(newObj) + + if oldEvent == nil || newEvent == nil { + // not valid or not for our tracked pod + return + } + + p.Logger.Tracef("handling event update event for %s: %v; %v", p.TrackedPod, oldEvent, newEvent) } // HandleEventDelete is an DeleteFunc for cache.ResourceEventHandlerFuncs for Events. //func (p *podTracker) HandleEventDelete(oldObj interface{}) { +// oldEvent := p.getTrackedPodEvent(oldObj) +// if newEvent == nil { +// // not valid or not for our tracked pod +// return +// } +// +// p.Logger.Tracef("handling event delete event for %s", p.TrackedPod) //} +// getTrackedPodEvent tries to convert the obj into an Event and makes sure it is for the tracked Pod. +// This should only be used by the funcs of cache.ResourceEventHandlerFuncs. +func (p *podTracker) getTrackedPodEvent(obj interface{}) *v1.Event { + var ( + event *v1.Event + ok bool + ) + + if event, ok = obj.(*v1.Event); !ok { + p.Logger.Errorf("error decoding event, invalid type") + return nil + } + + eventObjectName := event.InvolvedObject.Namespace + "/" + event.InvolvedObject.Name + if event.InvolvedObject.Kind != "Pod" || eventObjectName != p.TrackedPod { + p.Logger.Errorf( + "unexpected event for %s (%s), expected %s (Pod)", + eventObjectName, + event.InvolvedObject.Kind, + p.TrackedPod, + ) + + return nil + } + + return event +} + // Start kicks off the API calls to start populating the cache. // There is no need to run this in a separate goroutine (ie go podTracker.Start(ctx)). func (p *podTracker) Start(ctx context.Context) { From 457a664da7108bc5900c5aa3dbdd672d52ae48cc Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 19:34:46 -0500 Subject: [PATCH 04/32] refactor: create eventInformer from eventInformerFactory we have to use different list options, so they can't share the same factory. --- runtime/kubernetes/pod_tracker.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index b7cc70b1..09c12989 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -280,6 +280,11 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po return nil, err } + // create fieldSelector for watching the pod events + fieldSelector := fields.Set{ + "involvedObject.name": fields.EscapeValue(pod.ObjectMeta.Name), + }.AsSelector() + // create filtered Informer factory which is commonly used for k8s controllers informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( clientset, @@ -290,7 +295,17 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po }), ) podInformer := informerFactory.Core().V1().Pods() - eventInformer := informerFactory.Core().V1().Events() + + // events do not have labels like the pods do, so we need a separate Informer + eventInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + clientset, + defaultResync, + kubeinformers.WithNamespace(pod.ObjectMeta.Namespace), + kubeinformers.WithTweakListOptions(func(listOptions *metav1.ListOptions) { + listOptions.FieldSelector = fieldSelector.String() + }), + ) + eventInformer := eventInformerFactory.Core().V1().Events() // initialize podTracker tracker := podTracker{ From 7d674e8cc21b6be58a9b70158afad0fb941e664d Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 19:35:24 -0500 Subject: [PATCH 05/32] refactor: rename selector=>labelSelector --- runtime/kubernetes/pod_tracker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 09c12989..4c650130 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -270,8 +270,8 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po log.Tracef("creating PodTracker for pod %s", trackedPod) - // create label selector for watching the pod - selector, err := labels.NewRequirement( + // create labelSelector for watching the pod + labelSelector, err := labels.NewRequirement( "pipeline", selection.Equals, []string{fields.EscapeValue(pod.ObjectMeta.Name)}, @@ -291,7 +291,7 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po defaultResync, kubeinformers.WithNamespace(pod.ObjectMeta.Namespace), kubeinformers.WithTweakListOptions(func(listOptions *metav1.ListOptions) { - listOptions.LabelSelector = selector.String() + listOptions.LabelSelector = labelSelector.String() }), ) podInformer := informerFactory.Core().V1().Pods() From 71c9b34359c8ab365466bd0df188c731ca14993a Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Thu, 28 Apr 2022 19:39:38 -0500 Subject: [PATCH 06/32] enhance: register eventInformerFactory on podTracker --- runtime/kubernetes/pod_tracker.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 4c650130..8270503a 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -45,7 +45,9 @@ type podTracker struct { // informerFactory is used to create Informers and Listers informerFactory kubeinformers.SharedInformerFactory - // informerDone is a function used to stop the informerFactory + // eventInformerFactory is used to create Informers and Listers for events + eventInformerFactory kubeinformers.SharedInformerFactory + // informerDone is a function used to stop informerFactory and eventInformerFactory informerDone context.CancelFunc // podInformer watches the given pod, caches the results, and makes them available in podLister podInformer informers.PodInformer @@ -230,6 +232,7 @@ func (p *podTracker) Start(ctx context.Context) { // Start method is non-blocking and runs all registered informers in a dedicated goroutine. p.informerFactory.Start(informerCtx.Done()) + p.eventInformerFactory.Start(informerCtx.Done()) } // Stop shuts down any informers (e.g. stop watching APIs). @@ -309,16 +312,17 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po // initialize podTracker tracker := podTracker{ - Logger: log, - TrackedPod: trackedPod, - informerFactory: informerFactory, - podInformer: podInformer, - PodLister: podInformer.Lister(), - PodSynced: podInformer.Informer().HasSynced, - eventInformer: eventInformer, - EventLister: eventInformer.Lister(), - EventSynced: eventInformer.Informer().HasSynced, - Ready: make(chan struct{}), + Logger: log, + TrackedPod: trackedPod, + informerFactory: informerFactory, + podInformer: podInformer, + PodLister: podInformer.Lister(), + PodSynced: podInformer.Informer().HasSynced, + eventInformerFactory: eventInformerFactory, + eventInformer: eventInformer, + EventLister: eventInformer.Lister(), + EventSynced: eventInformer.Informer().HasSynced, + Ready: make(chan struct{}), } // register event handler funcs in podInformer From c3e4b50f9bb49b9b6a30ebbdcf54859d86108443 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 11:18:23 -0500 Subject: [PATCH 07/32] enhance: add podTracker.inspectContainerEvent --- runtime/kubernetes/container.go | 35 +++++++++++++++++++++++++++++++ runtime/kubernetes/pod_tracker.go | 20 +++++++++++++----- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 28e0932d..4585722a 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -367,3 +367,38 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { } } } + +// inspectContainerEvent gathers container info from an event. +func (p *podTracker) inspectContainerEvent(event *v1.Event) { + if !strings.HasPrefix(event.InvolvedObject.FieldPath, "spec.containers") { + // event is not for a container + return + } + + // the FieldPath format is "spec.containers{container-name}" for named containers + containerName := strings.TrimPrefix(event.InvolvedObject.FieldPath, "spec.containers{") + containerName = strings.TrimSuffix(containerName, "}") + + if containerName == event.InvolvedObject.FieldPath { + // the FieldPath is probably the indexed "spec.containers[2]" format, + // which is only used for unnamed containers, + // but all of our containers are named. + p.Logger.Debugf("ignoring unnamed container, got pod fieldPath %s", event.InvolvedObject.FieldPath) + + return + } + + // get the containerTracker for this container + tracker, ok := p.Containers[containerName] + if !ok { + // unknown container (probably a sidecar injected by an admissions controller) + p.Logger.Debugf("ignoring untracked container %s from pod %s", containerName, p.TrackedPod) + + return + } + + // TODO: save the event on the tracker somehow, + // or send a signal that triggers reloading the events + // for this container + p.Logger.Tracef("container event for %s: [%s] %s", tracker.Name, event.Reason, event.Message) +} diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 8270503a..d3dec5e6 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -162,11 +162,14 @@ func (p *podTracker) HandleEventAdd(newObj interface{}) { return } - //if event.InvolvedObject.FieldPath == "spec.container{%s}" { - // - //} + p.Logger.Tracef( + "handling %s event add event for %s: fieldPath=%v", + newEvent.Type, // Normal, Warning + p.TrackedPod, + newEvent.InvolvedObject.FieldPath, + ) - p.Logger.Tracef("handling event add event for %s: %v", p.TrackedPod, newEvent) + p.inspectContainerEvent(newEvent) } // HandleEventUpdate is an UpdateFunc for cache.ResourceEventHandlerFuncs for Events. @@ -180,7 +183,14 @@ func (p *podTracker) HandleEventUpdate(oldObj, newObj interface{}) { return } - p.Logger.Tracef("handling event update event for %s: %v; %v", p.TrackedPod, oldEvent, newEvent) + p.Logger.Tracef( + "handling %s event update event for %s: fieldPath=%v", + newEvent.Type, // Normal, Warning + p.TrackedPod, + newEvent.InvolvedObject.FieldPath, + ) + + p.inspectContainerEvent(newEvent) } // HandleEventDelete is an DeleteFunc for cache.ResourceEventHandlerFuncs for Events. From 3e89df69018fbbb3a1c5d5d8577588dc437b0b8f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 11:52:40 -0500 Subject: [PATCH 08/32] enhance: add signal for running container --- runtime/kubernetes/container.go | 7 +++++++ runtime/kubernetes/container_test.go | 1 + runtime/kubernetes/pod_tracker.go | 5 +++++ 3 files changed, 13 insertions(+) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 4585722a..b8cfdec1 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -364,6 +364,13 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { // let WaitContainer know the container is terminated close(tracker.Terminated) }) + } else if cst.State.Running != nil { + tracker.runningOnce.Do(func() { + p.Logger.Debugf("container running: %s in pod %s, %v", cst.Name, p.TrackedPod, cst) + + // let WaitContainer know the container is terminated + close(tracker.Running) + }) } } } diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index 74369658..baf6fa20 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -631,6 +631,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctnTracker := containerTracker{ Name: test.ctnName, + Running: make(chan struct{}), Terminated: make(chan struct{}), } podTracker := podTracker{ diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index d3dec5e6..355148f3 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -28,6 +28,10 @@ import ( type containerTracker struct { // Name is the name of the container Name string + // runningOnce ensures that the Terminated channel only gets closed once. + runningOnce sync.Once + // Running will be closed once the container reaches a terminal state. + Running chan struct{} // terminatedOnce ensures that the Terminated channel only gets closed once. terminatedOnce sync.Once // Terminated will be closed once the container reaches a terminal state. @@ -265,6 +269,7 @@ func (p *podTracker) TrackContainers(containers []v1.Container) { for _, ctn := range containers { p.Containers[ctn.Name] = &containerTracker{ Name: ctn.Name, + Running: make(chan struct{}), Terminated: make(chan struct{}), } } From 8ba7ad5da32a30dee61b7edf5eaa715b465587e7 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 12:00:04 -0500 Subject: [PATCH 09/32] enhance: only mark containers as running/terminated if it is the correct image --- runtime/kubernetes/container.go | 6 ++++++ runtime/kubernetes/container_test.go | 6 ++++++ runtime/kubernetes/pod_tracker.go | 4 ++++ 3 files changed, 16 insertions(+) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index b8cfdec1..91a0a2d9 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -354,6 +354,12 @@ func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { // cst.LastTerminationState has details about the kubernetes/pause image's exit. // cst.RestartCount is 1 at exit due to switch from kubernetes/pause to final image. + if cst.Image != tracker.Image { + // we don't care if the pause image has terminated or is running + p.Logger.Tracef("container %s expected image %s, got %s", cst.Name, tracker.Image, cst.Image) + continue + } + // check if the container is in a terminated state // // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index baf6fa20..5225b111 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -549,6 +549,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { name string trackedPod string ctnName string + ctnImage string terminated bool pod *v1.Pod }{ @@ -556,6 +557,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { name: "container is terminated", trackedPod: "test/github-octocat-1", ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", terminated: true, pod: _pod, }, @@ -563,6 +565,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { name: "pod is pending", trackedPod: "test/github-octocat-1", ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", terminated: false, pod: &v1.Pod{ ObjectMeta: _pod.ObjectMeta, @@ -577,6 +580,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { name: "container is running", trackedPod: "test/github-octocat-1", ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", terminated: false, pod: &v1.Pod{ ObjectMeta: _pod.ObjectMeta, @@ -599,6 +603,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { name: "pod has an untracked container", trackedPod: "test/github-octocat-1", ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", terminated: true, pod: &v1.Pod{ ObjectMeta: _pod.ObjectMeta, @@ -631,6 +636,7 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctnTracker := containerTracker{ Name: test.ctnName, + Image: test.ctnImage, Running: make(chan struct{}), Terminated: make(chan struct{}), } diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index 355148f3..cda23f3e 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -28,6 +28,9 @@ import ( type containerTracker struct { // Name is the name of the container Name string + // Image is the final image of the container + Image string + // runningOnce ensures that the Terminated channel only gets closed once. runningOnce sync.Once // Running will be closed once the container reaches a terminal state. @@ -269,6 +272,7 @@ func (p *podTracker) TrackContainers(containers []v1.Container) { for _, ctn := range containers { p.Containers[ctn.Name] = &containerTracker{ Name: ctn.Name, + Image: ctn.Image, Running: make(chan struct{}), Terminated: make(chan struct{}), } From 038bf97ff09985e115797a6b4bc3dd22ba32e0a6 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 13:41:44 -0500 Subject: [PATCH 10/32] enhance(k8s): exit WaitContainer if build is canceled --- runtime/kubernetes/container.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 91a0a2d9..f16b0d32 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -322,7 +322,14 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err } // wait for the container terminated signal - <-tracker.Terminated + select { + case <-tracker.Terminated: + // container is terminated + break + case <-ctx.Done(): + // build was canceled + break + } return nil } From b5ab8eb7e93d2b8e95e0c305d11e44ca359349b7 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 13:44:02 -0500 Subject: [PATCH 11/32] enhance(k8s): add containerTracker.Events() function This needs access to the PodTracker's EvenLister, so we create the function when initializing the containerTracker. --- runtime/kubernetes/container_test.go | 3 +++ runtime/kubernetes/pod_tracker.go | 36 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index 5225b111..66f5fee3 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -639,6 +639,9 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { Image: test.ctnImage, Running: make(chan struct{}), Terminated: make(chan struct{}), + Events: func() ([]*v1.Event, error) { + return nil, nil + }, } podTracker := podTracker{ Logger: logger, diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go index cda23f3e..dc942aac 100644 --- a/runtime/kubernetes/pod_tracker.go +++ b/runtime/kubernetes/pod_tracker.go @@ -40,6 +40,10 @@ type containerTracker struct { // Terminated will be closed once the container reaches a terminal state. Terminated chan struct{} // TODO: collect streaming logs here before TailContainer is called + + // Events is a function that returns a list of kubernetes events + // related to the tracked container. + Events func() ([]*v1.Event, error) } // podTracker contains Informers used to watch and synchronize local k8s caches. @@ -47,6 +51,9 @@ type containerTracker struct { type podTracker struct { // https://pkg.go.dev/github.com/sirupsen/logrus#Entry Logger *logrus.Entry + + // Namespace is the Namespace of the tracked pod + Namespace string // TrackedPod is the Namespace/Name of the tracked pod TrackedPod string @@ -275,6 +282,34 @@ func (p *podTracker) TrackContainers(containers []v1.Container) { Image: ctn.Image, Running: make(chan struct{}), Terminated: make(chan struct{}), + Events: func() ([]*v1.Event, error) { + // EventLister only offers a labelSelector, + // but we need a fieldSelector for events, + // so filter all pod events to get just the events + // for this container. + var ctnEvents []*v1.Event + + // get all tracked pod events. + allEvents, err := p.EventLister. + Events(p.Namespace). + List(labels.Set{}.AsSelector()) + if err != nil { + return ctnEvents, err + } + + ctnFieldPath := fmt.Sprintf("spec.containers{%s}", ctn.Name) + + for _, event := range allEvents { + // skip events for other containers + if event.InvolvedObject.FieldPath != ctnFieldPath { + continue + } + + ctnEvents = append(ctnEvents, event) + } + + return ctnEvents, nil + }, } } } @@ -332,6 +367,7 @@ func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Po // initialize podTracker tracker := podTracker{ Logger: log, + Namespace: pod.ObjectMeta.Namespace, TrackedPod: trackedPod, informerFactory: informerFactory, podInformer: podInformer, From 57011c12dc16c41afb89fd5ec387de5571ea201f Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 13:51:32 -0500 Subject: [PATCH 12/32] tests: fix inspectContainerStatuses test --- runtime/kubernetes/container_test.go | 57 ++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index 66f5fee3..e177e494 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -590,7 +590,56 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ { - Name: "step-github-octocat-1-clone", + Name: "step-github-octocat-1-clone", + Image: "target/vela-git:v0.4.0", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + }, + { + name: "container is still running pause image", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", + terminated: false, + pod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "step-github-octocat-1-clone", + Image: pauseImage, + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + }, + { + name: "container is still running pause image", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + ctnImage: "target/vela-git:v0.4.0", + terminated: false, + pod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "step-github-octocat-1-clone", + Image: image.Parse(pauseImage), State: v1.ContainerState{ Running: &v1.ContainerStateRunning{}, }, @@ -613,7 +662,8 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ { - Name: "step-github-octocat-1-clone", + Name: "step-github-octocat-1-clone", + Image: "target/vela-git:v0.4.0", State: v1.ContainerState{ Terminated: &v1.ContainerStateTerminated{ Reason: "Completed", @@ -622,7 +672,8 @@ func Test_podTracker_inspectContainerStatuses(t *testing.T) { }, }, { - Name: "injected-by-admissions-controller", + Name: "injected-by-admissions-controller", + Image: "target/vela-git:v0.4.0", State: v1.ContainerState{ Running: &v1.ContainerStateRunning{}, }, From 639b2198acafb09e33dee23868aec3f1c9f1665b Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Fri, 29 Apr 2022 14:21:35 -0500 Subject: [PATCH 13/32] bugfix(k8s): Make sure image pull errors are detected --- runtime/kubernetes/container.go | 70 +++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index f16b0d32..66cb3dd8 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -81,6 +81,12 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p return err } + // get the containerTracker for this container + ctnTracker, ok := c.PodTracker.Containers[ctn.ID] + if !ok { + return fmt.Errorf("containerTracker missing for %s", ctn.ID) + } + // set the pod container image to the parsed step image c.Pod.Spec.Containers[c.containersLookup[ctn.ID]].Image = _image @@ -98,6 +104,70 @@ func (c *client) RunContainer(ctx context.Context, ctn *pipeline.Container, b *p return err } + var ( + events []*v1.Event + imagePulled bool + ) + + // make sure the container starts (watch for image pull errors or similar) + for { + select { + case <-ctx.Done(): + // build was canceled. give up + return nil + case <-ctnTracker.Running: + // hooray it is running + return nil + default: + } + + // no need to search for image pull events + if imagePulled { + continue + } + + events, err = ctnTracker.Events() + if err != nil { + return err + } + + for _, event := range events { + // check if the event mentions the target image + if !(strings.Contains(event.Message, ctn.Image) || strings.Contains(event.Message, _image)) { + // if the relevant messages does not include our image + // it is probably for "kubernetes/pause:latest" + // or it is a generic message that is basically useless like: + // event.Reason => event.Message + // Failed => Error: ErrImagePull + // BackOff => Error: ImagePullBackOff + continue + } + + switch event.Reason { + // examples: event.Reason => event.Message + case "Failed", "BackOff": + // Failed => Failed to pull image "image:tag": + // BackOff => Back-off pulling image "image:tag" + return fmt.Errorf("failed to run container %s in %s: %s", ctn.ID, c.Pod.ObjectMeta.Name, event.Message) + case "Pulled": + // Pulled => Successfully pulled image "image:tag" in