Skip to content

Commit

Permalink
Merge pull request #138 from DirectXMan12/bug/fix-partial-collections
Browse files Browse the repository at this point in the history
Don't discard node results on error
  • Loading branch information
DirectXMan12 authored Sep 17, 2018
2 parents a98b91f + e35b903 commit 6cd5261
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 22 deletions.
3 changes: 0 additions & 3 deletions cmd/metrics-server/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ func (o MetricsServerOptions) Run(stopCh <-chan struct{}) error {
// set up the general manager
manager.RegisterDurationMetrics(o.MetricResolution)
mgr := manager.NewManager(sourceManager, metricSink, o.MetricResolution)
if err != nil {
return fmt.Errorf("unable to create main manager: %v", err)
}

// inject the providers into the config
config.ProviderConfig.Node = metricsProvider
Expand Down
4 changes: 3 additions & 1 deletion pkg/sources/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ type MetricsPoint struct {
// It is expected that the batch returned contains unique values (i.e. it does not return
// the same node, pod, or container as any other source).
type MetricSource interface {
// Collect fetches a batch of metrics. It may return both a partial result and an error.
// Collect fetches a batch of metrics. It may return both a partial result and an error,
// and non-nil results thus must be well-formed and meaningful even when accompanied by
// and error.
Collect(context.Context) (*MetricsBatch, error)
// Name names the metrics source for identification purposes
Name() string
Expand Down
4 changes: 4 additions & 0 deletions pkg/sources/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (m *sourceManager) Collect(baseCtx context.Context) (*MetricsBatch, error)
srcBatch := <-responseChannel
if err != nil {
errs = append(errs, err)
// NB: partial node results are still worth saving, so
// don't skip storing results if we got an error
}
if srcBatch == nil {
continue
}

Expand Down
32 changes: 26 additions & 6 deletions pkg/sources/summary/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,29 @@ func (src *summaryMetricsSource) Collect(ctx context.Context) (*sources.MetricsB

var errs []error
errs = append(errs, src.decodeNodeStats(&summary.Node, &res.Nodes[0])...)
for i, pod := range summary.Pods {
errs = append(errs, src.decodePodStats(&pod, &res.Pods[i])...)
if len(errs) != 0 {
// if we had errors providing node metrics, discard the data point
// so that we don't incorrectly report metric values as zero.
res.Nodes = res.Nodes[:1]
}

num := 0
for _, pod := range summary.Pods {
podErrs := src.decodePodStats(&pod, &res.Pods[num])
errs = append(errs, podErrs...)
if len(podErrs) != 0 {
// NB: we explicitly want to discard pods with partial results, since
// the horizontal pod autoscaler takes special action when a pod is missing
// metrics (and zero CPU or memory does not count as "missing metrics")

// we don't care if we reuse slots in the result array,
// because they get completely overwritten in decodePodStats
continue
}
num++
}
res.Pods = res.Pods[:num]

return res, utilerrors.NewAggregate(errs)
}

Expand All @@ -130,15 +149,16 @@ func (src *summaryMetricsSource) decodeNodeStats(nodeStats *stats.NodeStats, tar
}
var errs []error
if err := decodeCPU(&target.CpuUsage, nodeStats.CPU); err != nil {
errs = append(errs, fmt.Errorf("unable to get CPU for node %q: %v", src.node.ConnectAddress, err))
errs = append(errs, fmt.Errorf("unable to get CPU for node %q, discarding data: %v", src.node.ConnectAddress, err))
}
if err := decodeMemory(&target.MemoryUsage, nodeStats.Memory); err != nil {
errs = append(errs, fmt.Errorf("unable to get memory for node %q: %v", src.node.ConnectAddress, err))
errs = append(errs, fmt.Errorf("unable to get memory for node %q, discarding data: %v", src.node.ConnectAddress, err))
}
return errs
}

func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target *sources.PodMetricsPoint) []error {
// completely overwrite data in the target
*target = sources.PodMetricsPoint{
Name: podStats.PodRef.Name,
Namespace: podStats.PodRef.Namespace,
Expand All @@ -160,10 +180,10 @@ func (src *summaryMetricsSource) decodePodStats(podStats *stats.PodStats, target
},
}
if err := decodeCPU(&point.CpuUsage, container.CPU); err != nil {
errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
errs = append(errs, fmt.Errorf("unable to get CPU for container %q in pod %s/%s on node %q, discarding data: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
}
if err := decodeMemory(&point.MemoryUsage, container.Memory); err != nil {
errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
errs = append(errs, fmt.Errorf("unable to get memory for container %q in pod %s/%s on node %q: %v, discarding data", container.Name, target.Namespace, target.Name, src.node.ConnectAddress, err))
}

target.Containers[i] = point
Expand Down
28 changes: 16 additions & 12 deletions pkg/sources/summary/summary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,23 @@ func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
var expectedPods []interface{}
for _, pod := range summary.Pods {
containers := make([]sources.ContainerMetricsPoint, len(pod.Containers))
missingData := false
for i, container := range pod.Containers {
var cpuUsage, memoryUsage resource.Quantity
var timestamp time.Time
if container.CPU != nil {
if container.CPU.UsageNanoCores != nil {
cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
}
timestamp = container.CPU.Time.Time
if container.CPU == nil || container.CPU.UsageNanoCores == nil {
missingData = true
break
}
if container.Memory != nil {
if container.Memory.WorkingSetBytes != nil {
memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
}
if timestamp.IsZero() {
timestamp = container.Memory.Time.Time
}
cpuUsage = *resource.NewScaledQuantity(int64(*container.CPU.UsageNanoCores), -9)
timestamp = container.CPU.Time.Time
if container.Memory == nil || container.Memory.WorkingSetBytes == nil {
missingData = true
break
}
memoryUsage = *resource.NewQuantity(int64(*container.Memory.WorkingSetBytes), resource.BinarySI)
if timestamp.IsZero() {
timestamp = container.Memory.Time.Time
}

containers[i] = sources.ContainerMetricsPoint{
Expand All @@ -151,6 +152,9 @@ func verifyPods(summary *stats.Summary, batch *sources.MetricsBatch) {
},
}
}
if missingData {
continue
}
expectedPods = append(expectedPods, sources.PodMetricsPoint{
Name: pod.PodRef.Name,
Namespace: pod.PodRef.Namespace,
Expand Down

0 comments on commit 6cd5261

Please sign in to comment.