Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jrasell committed Jan 2, 2025
1 parent 618135e commit b2450ae
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 23 deletions.
70 changes: 47 additions & 23 deletions drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package docker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -89,13 +90,18 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration, compute
return recvCh, nil
}

// collectStats starts collecting resource usage stats of a docker container
// collectStats starts collecting resource usage stats of a Docker container
// and does this until the context or the tasks handler done channel is closed.
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration, compute cpustats.Compute) {
defer destCh.close()

// retry tracks the number of retries the collection has been through since
// the last successful Docker API call. This is used to calculate the
// backoff time for the collection ticker.
var retry uint64

ticker, cancel := helper.NewSafeTicker(interval)
defer cancel()
var stats *containerapi.Stats

for {
select {
Expand All @@ -104,30 +110,48 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
case <-h.doneCh:
return
case <-ticker.C:
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot. This streaming stats can be reused over time,
// but require synchronization, which restricts the interval for the metrics.
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
stats, err := h.collectDockerStats(ctx)
switch err {
case nil:
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
ticker.Reset(interval)
retry = 0
default:
h.logger.Error("error collecting stats from container", "error", err)
ticker.Reset(helper.Backoff(statsCollectorBackoffBaseline, statsCollectorBackoffLimit, retry))
retry++
}
}
}
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
statsReader.Body.Close()
if err != nil && err != io.EOF {
h.logger.Error("error decoding stats data from container", "error", err)
return
}
// collectDockerStats performs the stats collection from the Docker API. It is
// split into its own function for the purpose of aiding testing.
func (h *taskHandle) collectDockerStats(ctx context.Context) (*containerapi.Stats, error) {

if stats == nil {
h.logger.Error("error decoding stats data: stats were nil")
return
}
var stats *containerapi.Stats

resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, false)
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to collect stats: %w", err)
}

// Ensure the body is not nil to avoid potential panics. The statsReader
// itself cannot be nil, so there is no need to check this.
if statsReader.Body == nil {
return nil, errors.New("error decoding stats data: no reader body")
}

err = json.NewDecoder(statsReader.Body).Decode(&stats)
_ = statsReader.Body.Close()
if err != nil && err != io.EOF {
return nil, fmt.Errorf("failed to decode Docker response: %w", err)
}

if stats == nil {
return nil, errors.New("error decoding stats data: stats were nil")
}

return stats, nil
}
49 changes: 49 additions & 0 deletions drivers/docker/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package docker

import (
"context"
"runtime"
"sync"
"testing"
"time"

containerapi "github.com/docker/docker/api/types/container"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cpustats"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/shoenig/test/must"
)
Expand Down Expand Up @@ -112,3 +115,49 @@ func TestDriver_DockerUsageSender(t *testing.T) {
destCh.close()
destCh.send(res)
}

func Test_taskHandle_collectDockerStats(t *testing.T) {
ci.Parallel(t)
testutil.DockerCompatible(t)

// Start a Docker container and wait for it to be running, so we can
// guarantee stats generation.
driverCfg, dockerTaskConfig, _ := dockerTask(t)

must.NoError(t, driverCfg.EncodeConcreteDriverConfig(dockerTaskConfig))

_, driverHarness, handle, cleanup := dockerSetup(t, driverCfg, nil)
defer cleanup()
must.NoError(t, driverHarness.WaitUntilStarted(driverCfg.ID, 5*time.Second))

// Generate a context, so the test doesn't hang on Docker problems and
// execute a single collection of the stats.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

dockerStats, err := handle.collectDockerStats(ctx)
must.NoError(t, err)
must.NotNil(t, dockerStats)

// Ensure all the stats we use for calculating CPU percentages within
// DockerStatsToTaskResourceUsage are present and non-zero.
must.NonZero(t, dockerStats.CPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.UsageInKernelmode)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.CPUStats.CPUUsage.UsageInUsermode)

must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.UsageInKernelmode)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.TotalUsage)
must.NonZero(t, dockerStats.PreCPUStats.CPUUsage.UsageInUsermode)

// System usage is only populated on Linux machines.
if runtime.GOOS == "linux" {
must.NonZero(t, dockerStats.CPUStats.SystemUsage)
must.NonZero(t, dockerStats.PreCPUStats.SystemUsage)
}

// Ensure the core memory stats are present and where desired, non-zero.
must.NonZero(t, dockerStats.MemoryStats.Usage)
must.MapContainsKey(t, dockerStats.MemoryStats.Stats, "file_mapped")
}

0 comments on commit b2450ae

Please sign in to comment.