diff --git a/cluster/inventory.go b/cluster/inventory.go index e28bac72..f485b181 100644 --- a/cluster/inventory.go +++ b/cluster/inventory.go @@ -494,7 +494,7 @@ func (is *inventoryService) run(ctx context.Context, reservationsArg []*reservat t.Stop() runch = is.runCheck(rctx, state) } - } else if reserveChLocal == nil { + } else if reserveChLocal == nil && state.inventory != nil { reserveChLocal = is.reservech } } diff --git a/cluster/kube/operators/clients/inventory/client.go b/cluster/kube/operators/clients/inventory/client.go index af059b4e..9054e028 100644 --- a/cluster/kube/operators/clients/inventory/client.go +++ b/cluster/kube/operators/clients/inventory/client.go @@ -234,6 +234,7 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In } func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan<- inventoryState) (<-chan struct{}, error) { + ctx, cancel := context.WithCancel(ctx) group, ctx := errgroup.WithContext(ctx) var svcPort int32 @@ -320,6 +321,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan< group.Go(func() error { err := pf.ForwardPorts() + cancel() errch <- err return err }) @@ -353,6 +355,7 @@ func newInventoryConnector(ctx context.Context, svc *corev1.Service, invch chan< group.Go(func() error { <-ctx.Done() + cancel() sigdone <- struct{}{} @@ -373,6 +376,14 @@ func inventoryRun(ctx context.Context, endpoint string, invch chan<- inventorySt defer func() { _ = conn.Close() + + select { + case invch <- inventoryState{ + evt: inventoryEventDeleted, + }: + default: + } + }() log.Info(fmt.Sprintf("dialing inventory operator at %s", endpoint))