From 81fb053c1dea65c29c13d4eaedc5474ca504c2b5 Mon Sep 17 00:00:00 2001 From: Artur Troian Date: Wed, 14 Feb 2024 12:11:17 -0500 Subject: [PATCH] chore(operator/inventory): drain sigexit channel on shutdown Signed-off-by: Artur Troian --- operator/inventory/nodes.go | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/operator/inventory/nodes.go b/operator/inventory/nodes.go index a03bf470..11488c56 100644 --- a/operator/inventory/nodes.go +++ b/operator/inventory/nodes.go @@ -79,7 +79,13 @@ func newClusterNodes(ctx context.Context, image, namespace string) *clusterNodes } func (cl *clusterNodes) Wait() error { - return cl.group.Wait() + log := fromctx.LogrFromCtx(cl.ctx).WithName("nodes") + + log.Info("waiting for nodes to finish") + err := cl.group.Wait() + log.Info("all nodes finished") + + return err } func (cl *clusterNodes) connector() error { @@ -92,12 +98,33 @@ func (cl *clusterNodes) connector() error { nctx, ncancel := context.WithCancel(ctx) defer func() { + log.Info("shutting down node connectors") + ncancel() + lctx, lcancel := context.WithCancel(context.Background()) + + go func() { + for { + select { + case <-lctx.Done(): + return + case <-cl.signaldone: + } + } + }() + for name, node := range nodes { + log.Info(fmt.Sprintf("shutting down node %s", name)) _ = node.shutdown() delete(nodes, name) + + log.Info(fmt.Sprintf("node %s has been shutdown", name)) } + + lcancel() + + log.Info("node connectors down") }() for { @@ -153,6 +180,7 @@ func (cl *clusterNodes) run() error { events := bus.Sub(topicInventoryNode) defer bus.Unsub(events) + for { select { case <-cl.ctx.Done():