Skip to content

Commit

Permalink
fix: make ipamv2 metrics resilient to missing custom resource definit…
Browse files Browse the repository at this point in the history
…ions (#3029)

Signed-off-by: Evan Baker <[email protected]>
  • Loading branch information
rbtr authored Oct 1, 2024
1 parent ffcac52 commit 1edb63f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 62 deletions.
150 changes: 96 additions & 54 deletions cns/ipampool/metrics/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Azure/azure-container-networking/crd/clustersubnetstate/api/v1alpha1"
"github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

// Subnet ARM ID /subscriptions/$(SUB)/resourceGroups/$(GROUP)/providers/Microsoft.Network/virtualNetworks/$(VNET)/subnets/$(SUBNET)
Expand Down Expand Up @@ -46,14 +47,22 @@ type metaState struct {
subnetCIDR string
}

type observer struct {
ipSrc func() map[string]cns.IPConfigurationStatus
nncSrc func(context.Context) (*v1alpha.NodeNetworkConfig, error)
cssSrc func(context.Context) ([]v1alpha1.ClusterSubnetState, error)
}

// NewLegacyMetricsObserver creates a closed functional scope which can be invoked to
// observe the legacy IPAM pool metrics.
//
//nolint:lll // ignore line length
func NewLegacyMetricsObserver(ctx context.Context, ipcli func() map[string]cns.IPConfigurationStatus, nnccli func(context.Context) (*v1alpha.NodeNetworkConfig, error), csscli func(context.Context) ([]v1alpha1.ClusterSubnetState, error)) func() error {
return func() error {
return observeMetrics(ctx, ipcli, nnccli, csscli)
}
func NewLegacyMetricsObserver(ipSrc func() map[string]cns.IPConfigurationStatus, nncSrc func(context.Context) (*v1alpha.NodeNetworkConfig, error), cssSrc func(context.Context) ([]v1alpha1.ClusterSubnetState, error)) func(context.Context) error {
return (&observer{
ipSrc: ipSrc,
nncSrc: nncSrc,
cssSrc: cssSrc,
}).observeMetrics
}

// generateARMID uses the Subnet ARM ID format to populate the ARM ID with the metadata.
Expand All @@ -73,68 +82,98 @@ func generateARMID(nc *v1alpha.NetworkContainer) string {
// observeMetrics observes the IP pool and updates the metrics. Blocking.
//
//nolint:lll // ignore line length
func observeMetrics(ctx context.Context, ipcli func() map[string]cns.IPConfigurationStatus, nnccli func(context.Context) (*v1alpha.NodeNetworkConfig, error), csscli func(context.Context) ([]v1alpha1.ClusterSubnetState, error)) error {
csslist, err := csscli(ctx)
if err != nil {
return err
}
nnc, err := nnccli(ctx)
if err != nil {
return err
}
ips := ipcli()
func (o *observer) observeMetrics(ctx context.Context) error {
// The error group is used to allow individual metrics sources to fail without
// failing out the entire attempt to observe the Pool. This may happen if there is a
// transient issue with the source of the data, or if the source is not available
// (like if the CRD is not installed).
var g errgroup.Group

// Get the current state of world.
var meta metaState
for i := range csslist {
if csslist[i].Status.Exhausted {
meta.exhausted = true
break
}
}
if len(nnc.Status.NetworkContainers) > 0 {
// Set SubnetName, SubnetAddressSpace and Pod Network ARM ID values to the global subnet, subnetCIDR and subnetARM variables.
meta.subnet = nnc.Status.NetworkContainers[0].SubnetName
meta.subnetCIDR = nnc.Status.NetworkContainers[0].SubnetAddressSpace
meta.subnetARMID = generateARMID(&nnc.Status.NetworkContainers[0])
}
meta.primaryIPAddresses = make(map[string]struct{})
// Add Primary IP to Map, if not present.
// This is only for Swift i.e. if NC Type is vnet.
for i := 0; i < len(nnc.Status.NetworkContainers); i++ {
nc := nnc.Status.NetworkContainers[i]
if nc.Type == "" || nc.Type == v1alpha.VNET {
meta.primaryIPAddresses[nc.PrimaryIP] = struct{}{}
g.Go(func() error {
// Try to fetch the ClusterSubnetState, if available.
if o.cssSrc != nil {
csslist, err := o.cssSrc(ctx)
if err != nil {
return err
}
for i := range csslist {
if csslist[i].Status.Exhausted {
meta.exhausted = true
break
}
}
}
return nil
})

if nc.Type == v1alpha.VNETBlock {
primaryPrefix, err := netip.ParsePrefix(nc.PrimaryIP)
var state ipPoolState
g.Go(func() error {
// Try to fetch the NodeNetworkConfig, if available.
if o.nncSrc != nil {
nnc, err := o.nncSrc(ctx)
if err != nil {
return errors.Wrapf(err, "unable to parse ip prefix: %s", nc.PrimaryIP)
return err
}
if len(nnc.Status.NetworkContainers) > 0 {
// Set SubnetName, SubnetAddressSpace and Pod Network ARM ID values to the global subnet, subnetCIDR and subnetARM variables.
meta.subnet = nnc.Status.NetworkContainers[0].SubnetName
meta.subnetCIDR = nnc.Status.NetworkContainers[0].SubnetAddressSpace
meta.subnetARMID = generateARMID(&nnc.Status.NetworkContainers[0])
}
meta.primaryIPAddresses = make(map[string]struct{})
// Add Primary IP to Map, if not present.
// This is only for Swift i.e. if NC Type is vnet.
for i := 0; i < len(nnc.Status.NetworkContainers); i++ {
nc := nnc.Status.NetworkContainers[i]
if nc.Type == "" || nc.Type == v1alpha.VNET {
meta.primaryIPAddresses[nc.PrimaryIP] = struct{}{}
}

if nc.Type == v1alpha.VNETBlock {
primaryPrefix, err := netip.ParsePrefix(nc.PrimaryIP)
if err != nil {
return errors.Wrapf(err, "unable to parse ip prefix: %s", nc.PrimaryIP)
}
meta.primaryIPAddresses[primaryPrefix.Addr().String()] = struct{}{}
}
}
meta.primaryIPAddresses[primaryPrefix.Addr().String()] = struct{}{}
state.requestedIPs = nnc.Spec.RequestedIPCount
meta.batch = nnc.Status.Scaler.BatchSize
meta.max = nnc.Status.Scaler.MaxIPCount
}
}
return nil
})

state := ipPoolState{
secondaryIPs: int64(len(ips)),
requestedIPs: nnc.Spec.RequestedIPCount,
}
for i := range ips {
ip := ips[i]
switch ip.GetState() {
case types.Assigned:
state.allocatedToPods++
case types.Available:
state.available++
case types.PendingProgramming:
state.pendingProgramming++
case types.PendingRelease:
state.pendingRelease++
g.Go(func() error {
// Try to fetch the IPConfigurations, if available.
if o.ipSrc != nil {
ips := o.ipSrc()
state.secondaryIPs = int64(len(ips))
for i := range ips {
ip := ips[i]
switch ip.GetState() {
case types.Assigned:
state.allocatedToPods++
case types.Available:
state.available++
case types.PendingProgramming:
state.pendingProgramming++
case types.PendingRelease:
state.pendingRelease++
}
}
}
}
return nil
})

err := g.Wait()

state.currentAvailableIPs = state.secondaryIPs - state.allocatedToPods - state.pendingRelease
state.expectedAvailableIPs = state.requestedIPs - state.allocatedToPods

// Update the metrics.
labels := []string{meta.subnet, meta.subnetCIDR, meta.subnetARMID}
IpamAllocatedIPCount.WithLabelValues(labels...).Set(float64(state.allocatedToPods))
IpamAvailableIPCount.WithLabelValues(labels...).Set(float64(state.available))
Expand All @@ -153,5 +192,8 @@ func observeMetrics(ctx context.Context, ipcli func() map[string]cns.IPConfigura
} else {
IpamSubnetExhaustionState.WithLabelValues(labels...).Set(float64(SubnetIPNotExhausted))
}
if err != nil {
return errors.Wrap(err, "failed to collect all metrics")
}
return nil
}
8 changes: 4 additions & 4 deletions cns/ipampool/v2/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Monitor struct {
nncSource <-chan v1alpha.NodeNetworkConfig
started chan interface{}
once sync.Once
legacyMetricsObserver func() error
legacyMetricsObserver func(context.Context) error
}

func NewMonitor(z *zap.Logger, store ipStateStore, nnccli nodeNetworkConfigSpecUpdater, demandSource <-chan int, nncSource <-chan v1alpha.NodeNetworkConfig, cssSource <-chan v1alpha1.ClusterSubnetState) *Monitor { //nolint:lll // it's fine
Expand All @@ -59,7 +59,7 @@ func NewMonitor(z *zap.Logger, store ipStateStore, nnccli nodeNetworkConfigSpecU
cssSource: cssSource,
nncSource: nncSource,
started: make(chan interface{}),
legacyMetricsObserver: func() error { return nil },
legacyMetricsObserver: func(context.Context) error { return nil },
}
}

Expand Down Expand Up @@ -100,7 +100,7 @@ func (pm *Monitor) Start(ctx context.Context) error {
if err := pm.reconcile(ctx); err != nil {
pm.z.Error("reconcile failed", zap.Error(err))
}
if err := pm.legacyMetricsObserver(); err != nil {
if err := pm.legacyMetricsObserver(ctx); err != nil {
pm.z.Error("legacy metrics observer failed", zap.Error(err))
}
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (pm *Monitor) buildNNCSpec(request int64) v1alpha.NodeNetworkConfigSpec {
return spec
}

func (pm *Monitor) WithLegacyMetricsObserver(observer func() error) {
func (pm *Monitor) WithLegacyMetricsObserver(observer func(context.Context) error) {
pm.legacyMetricsObserver = observer
}

Expand Down
19 changes: 16 additions & 3 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func startTelemetryService(ctx context.Context) {
log.Errorf("Telemetry service failed to start: %w", err)
return
}
tb.PushData(rootCtx)
tb.PushData(ctx)
}

// Main is the entry point for CNS.
Expand Down Expand Up @@ -1349,6 +1349,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
}
}

if cnsconfig.EnableSubnetScarcity {
cacheOpts.ByObject[&cssv1alpha1.ClusterSubnetState{}] = cache.ByObject{
Namespaces: map[string]cache.Config{
"kube-system": {},
},
}
}

managerOpts := ctrlmgr.Options{
Scheme: scheme,
Metrics: ctrlmetrics.Options{BindAddress: "0"},
Expand All @@ -1374,9 +1382,13 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
cssCh := make(chan cssv1alpha1.ClusterSubnetState)
ipDemandCh := make(chan int)
if cnsconfig.EnableIPAMv2 {
cssSrc := func(context.Context) ([]cssv1alpha1.ClusterSubnetState, error) { return nil, nil }
if cnsconfig.EnableSubnetScarcity {
cssSrc = clustersubnetstate.NewClient(manager.GetClient()).List
}
nncCh := make(chan v1alpha.NodeNetworkConfig)
pmv2 := ipampoolv2.NewMonitor(z, httpRestServiceImplementation, cachedscopedcli, ipDemandCh, nncCh, cssCh)
obs := metrics.NewLegacyMetricsObserver(ctx, httpRestService.GetPodIPConfigState, cachedscopedcli.Get, clustersubnetstate.NewClient(manager.GetClient()).List)
obs := metrics.NewLegacyMetricsObserver(httpRestService.GetPodIPConfigState, cachedscopedcli.Get, cssSrc)
pmv2.WithLegacyMetricsObserver(obs)
poolMonitor = pmv2.AsV1(nncCh)
} else {
Expand Down Expand Up @@ -1462,13 +1474,14 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
// wait for the Reconciler to run once on a NNC that was made for this Node.
// the nncReadyCtx has a timeout of 15 minutes, after which we will consider
// this false and the NNC Reconciler stuck/failed, log and retry.
nncReadyCtx, _ := context.WithTimeout(ctx, 15*time.Minute) //nolint // it will time out and not leak
nncReadyCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) //nolint // it will time out and not leak
if started, err := nncReconciler.Started(nncReadyCtx); !started {
log.Errorf("NNC reconciler has not started, does the NNC exist? err: %v", err)
nncReconcilerStartFailures.Inc()
continue
}
logger.Printf("NodeNetworkConfig reconciler has started.")
cancel()
break
}

Expand Down
2 changes: 1 addition & 1 deletion crd/clustersubnetstate/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ func (c *Client) Get(ctx context.Context, key types.NamespacedName) (*v1alpha1.C

func (c *Client) List(ctx context.Context) ([]v1alpha1.ClusterSubnetState, error) {
clusterSubnetStateList := &v1alpha1.ClusterSubnetStateList{}
err := c.cli.List(ctx, clusterSubnetStateList)
err := c.cli.List(ctx, clusterSubnetStateList, client.InNamespace("kube-system"))
return clusterSubnetStateList.Items, errors.Wrap(err, "failed to list css")
}

0 comments on commit 1edb63f

Please sign in to comment.