Skip to content

Commit

Permalink
[YUNIKORN-1994] fix release containers metrix in queue
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Sep 25, 2023
1 parent 57544a6 commit 1e6ecba
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/metrics/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type CoreQueueMetrics interface {
IncQueueApplicationsCompleted()
IncAllocatedContainer()
IncReleasedContainer()
AddReleasedContainers(value int)
SetQueueGuaranteedResourceMetrics(resourceName string, value float64)
SetQueueMaxResourceMetrics(resourceName string, value float64)
SetQueueAllocatedResourceMetrics(resourceName string, value float64)
Expand Down
4 changes: 4 additions & 0 deletions pkg/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func (m *QueueMetrics) IncReleasedContainer() {
m.appMetrics.With(prometheus.Labels{"state": "released"}).Inc()
}

func (m *QueueMetrics) AddReleasedContainers(value int) {
m.appMetrics.With(prometheus.Labels{"state": "released"}).Add(float64(value))
}

func (m *QueueMetrics) SetQueueGuaranteedResourceMetrics(resourceName string, value float64) {
m.ResourceMetrics.With(prometheus.Labels{"state": "guaranteed", "resource": resourceName}).Set(value)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ func TestReleasedContainers(t *testing.T) {
verifyAppMetrics(t, "released")
}

func TestAddReleasedContainers(t *testing.T) {
cqm = getQueueMetrics()
defer unregisterQueueMetrics(t)

cqm.AddReleasedContainers(1)
verifyAppMetrics(t, "released")
}

func TestQueueGuaranteedResourceMetrics(t *testing.T) {
cqm = getQueueMetrics()
defer unregisterQueueMetrics(t)
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func (pc *PartitionContext) removeNode(nodeID string) ([]*objects.Allocation, []
func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) {
released := make([]*objects.Allocation, 0)
confirmed := make([]*objects.Allocation, 0)
releasedContainerByQueue := make(map[string]int)
// walk over all allocations still registered for this node
for _, alloc := range node.GetAllAllocations() {
allocID := alloc.GetUUID()
Expand Down Expand Up @@ -768,8 +769,6 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object
log.Log(log.SchedPartition).Warn("failed to release resources from queue",
zap.String("appID", alloc.GetApplicationID()),
zap.Error(err))
} else {
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
}
// remove preempted resources
if alloc.IsPreempted() {
Expand All @@ -781,12 +780,16 @@ func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*object

// the allocation is removed so add it to the list that we return
released = append(released, alloc)
releasedContainerByQueue[queue.GetQueuePath()]++
log.Log(log.SchedPartition).Info("allocation removed from node",
zap.String("nodeID", node.NodeID),
zap.String("allocationId", allocID))
}
// track the number of allocations: decrement the released allocation AND increment with the confirmed
pc.updateAllocationCount(len(confirmed) - len(released))
for queuePath, count := range releasedContainerByQueue {
metrics.GetQueueMetrics(queuePath).AddReleasedContainers(count)
}
return released, confirmed
}

Expand Down Expand Up @@ -1356,8 +1359,6 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
zap.String("appID", appID),
zap.String("allocationId", uuid),
zap.Error(err))
} else {
metrics.GetQueueMetrics(queue.GetQueuePath()).IncReleasedContainer()
}
}
if resources.StrictlyGreaterThanZero(totalPreempting) {
Expand All @@ -1371,6 +1372,7 @@ func (pc *PartitionContext) removeAllocation(release *si.AllocationRelease) ([]*
}
// track the number of allocations, when we replace the result is no change
pc.updateAllocationCount(-len(released))
metrics.GetQueueMetrics(queue.GetQueuePath()).AddReleasedContainers(len(released))

// if the termination type is timeout, we don't notify the shim, because it's
// originated from that side
Expand Down

0 comments on commit 1e6ecba

Please sign in to comment.