Skip to content

Commit

Permalink
Merge pull request containerd#10375 from linxiulei/podstats
Browse files Browse the repository at this point in the history
cri: get pid count from container metrics
  • Loading branch information
fuweid authored Jul 1, 2024
2 parents 3b2a14b + f6e731c commit 1fb1882
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 59 deletions.
2 changes: 1 addition & 1 deletion internal/cri/server/container_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ func (c *criService) ContainerStats(ctx context.Context, in *runtime.ContainerSt
if err != nil {
return nil, fmt.Errorf("failed to decode container metrics: %w", err)
}
return &runtime.ContainerStatsResponse{Stats: cs}, nil
return &runtime.ContainerStatsResponse{Stats: cs.stats}, nil
}
88 changes: 59 additions & 29 deletions internal/cri/server/container_stats_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (c *criService) ListContainerStats(
ctx context.Context,
in *runtime.ListContainerStatsRequest,
) (*runtime.ListContainerStatsResponse, error) {
css, err := c.listContainerStats(ctx, in)
if err != nil {
return nil, fmt.Errorf("failed to fetch containers and stats: %w", err)
}
return c.toCRIContainerStats(css), nil
}

func (c *criService) listContainerStats(
ctx context.Context,
in *runtime.ListContainerStatsRequest,
) ([]containerStats, error) {
request, containers, err := c.buildTaskMetricsRequest(in)
if err != nil {
return nil, fmt.Errorf("failed to build metrics request: %w", err)
Expand All @@ -51,14 +62,20 @@ func (c *criService) ListContainerStats(
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics for tasks: %w", err)
}
criStats, err := c.toCRIContainerStats(ctx, resp.Metrics, containers)
css, err := c.toContainerStats(ctx, resp.Metrics, containers)
if err != nil {
return nil, fmt.Errorf("failed to convert to cri containerd stats format: %w", err)
}
return criStats, nil
return css, nil
}

type metricsHandler func(containerstore.Metadata, *types.Metric) (*runtime.ContainerStats, error)
type containerStats struct {
stats *runtime.ContainerStats
// pids is only valid in linux platform
pids uint64
}

type metricsHandler func(containerstore.Metadata, *types.Metric) (containerStats, error)

// Returns a function to be used for transforming container metrics into the right format.
// Uses the platform the given sandbox advertises to implement its logic. If the platform is
Expand Down Expand Up @@ -86,28 +103,28 @@ func (c *criService) getMetricsHandler(ctx context.Context, sandboxID string) (m

switch p.OS {
case "windows":
return func(meta containerstore.Metadata, stats *types.Metric) (*runtime.ContainerStats, error) {
return func(meta containerstore.Metadata, stats *types.Metric) (containerStats, error) {
return c.windowsContainerMetrics(meta, stats, snapshotter)
}, nil
case "linux":
return func(meta containerstore.Metadata, stats *types.Metric) (*runtime.ContainerStats, error) {
return func(meta containerstore.Metadata, stats *types.Metric) (containerStats, error) {
return c.linuxContainerMetrics(meta, stats, snapshotter)
}, nil
default:
return nil, fmt.Errorf("container metrics for platform %+v: %w", p, errdefs.ErrNotImplemented)
}
}

func (c *criService) toCRIContainerStats(
func (c *criService) toContainerStats(
ctx context.Context,
stats []*types.Metric,
containers []containerstore.Container,
) (*runtime.ListContainerStatsResponse, error) {
) ([]containerStats, error) {
statsMap := make(map[string]*types.Metric)
for _, stat := range stats {
statsMap[stat.ID] = stat
}
containerStats := new(runtime.ListContainerStatsResponse)
css := []containerStats{}

// Unfortunately if no filter was passed we're asking for every containers stats which
// generally belong to multiple different pods, who all might have different platforms.
Expand Down Expand Up @@ -143,17 +160,25 @@ func (c *criService) toCRIContainerStats(
return nil, fmt.Errorf("failed to decode container metrics for %q: %w", cntr.ID, err)
}

if cs.Cpu != nil && cs.Cpu.UsageCoreNanoSeconds != nil {
if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil {
// this is a calculated value and should be computed for all OSes
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.Cpu.Timestamp))
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to get usage nano cores, containerID: %s: %w", cntr.Metadata.ID, err)
}
cs.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
}
containerStats.Stats = append(containerStats.Stats, cs)
css = append(css, cs)
}
return containerStats, nil
return css, nil
}

func (c *criService) toCRIContainerStats(css []containerStats) *runtime.ListContainerStatsResponse {
containerStats := new(runtime.ListContainerStatsResponse)
for _, cs := range css {
containerStats.Stats = append(containerStats.Stats, cs.stats)
}
return containerStats
}

func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
Expand Down Expand Up @@ -275,7 +300,7 @@ func (c *criService) windowsContainerMetrics(
meta containerstore.Metadata,
stats *types.Metric,
snapshotter string,
) (*runtime.ContainerStats, error) {
) (containerStats, error) {
var cs runtime.ContainerStats
var usedBytes, inodesUsed uint64
sn, err := c.GetSnapshot(meta.ID, snapshotter)
Expand Down Expand Up @@ -303,11 +328,11 @@ func (c *criService) windowsContainerMetrics(
if stats != nil {
s, err := typeurl.UnmarshalAny(stats.Data)
if err != nil {
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err)
}
wstats := s.(*wstats.Statistics).GetWindows()
if wstats == nil {
return nil, errors.New("windows stats is empty")
return containerStats{}, errors.New("windows stats is empty")
}
if wstats.Processor != nil {
cs.Cpu = &runtime.CpuUsage{
Expand All @@ -324,16 +349,16 @@ func (c *criService) windowsContainerMetrics(
}
}
}
return &cs, nil
return containerStats{&cs, 0}, nil
}

func (c *criService) linuxContainerMetrics(
meta containerstore.Metadata,
stats *types.Metric,
snapshotter string,
) (*runtime.ContainerStats, error) {
) (containerStats, error) {
var cs runtime.ContainerStats
var usedBytes, inodesUsed uint64
var usedBytes, inodesUsed, pids uint64
sn, err := c.GetSnapshot(meta.ID, snapshotter)
// If snapshotstore doesn't have cached snapshot information
// set WritableLayer usage to zero
Expand Down Expand Up @@ -361,32 +386,37 @@ func (c *criService) linuxContainerMetrics(
switch {
case typeurl.Is(stats.Data, (*cg1.Metrics)(nil)):
data = &cg1.Metrics{}
if err := typeurl.UnmarshalTo(stats.Data, data); err != nil {
return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err)
}
pids = data.(*cg1.Metrics).GetPids().GetCurrent()
case typeurl.Is(stats.Data, (*cg2.Metrics)(nil)):
data = &cg2.Metrics{}
case typeurl.Is(stats.Data, (*wstats.Statistics)(nil)):
data = &wstats.Statistics{}
if err := typeurl.UnmarshalTo(stats.Data, data); err != nil {
return containerStats{}, fmt.Errorf("failed to extract container metrics: %w", err)
}
pids = data.(*cg2.Metrics).GetPids().GetCurrent()
default:
return nil, errors.New("cannot convert metric data to cgroups.Metrics or windows.Statistics")
}

if err := typeurl.UnmarshalTo(stats.Data, data); err != nil {
return nil, fmt.Errorf("failed to extract container metrics: %w", err)
return containerStats{}, errors.New("cannot convert metric data to cgroups.Metrics")
}

cpuStats, err := c.cpuContainerStats(meta.ID, false /* isSandbox */, data, protobuf.FromTimestamp(stats.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
return containerStats{}, fmt.Errorf("failed to obtain cpu stats: %w", err)
}
cs.Cpu = cpuStats

memoryStats, err := c.memoryContainerStats(meta.ID, data, protobuf.FromTimestamp(stats.Timestamp))
if err != nil {
return nil, fmt.Errorf("failed to obtain memory stats: %w", err)
return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err)
}
cs.Memory = memoryStats
if err != nil {
return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err)
}
}

return &cs, nil
return containerStats{&cs, pids}, nil
}

// getWorkingSet calculates workingset memory from cgroup memory stats.
Expand Down
6 changes: 5 additions & 1 deletion internal/cri/server/container_stats_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,18 @@ func TestListContainerStats(t *testing.T) {
if tt.before != nil {
tt.before()
}
got, err := c.toCRIContainerStats(tt.args.ctx, tt.args.stats, tt.args.containers)
css, err := c.toContainerStats(tt.args.ctx, tt.args.stats, tt.args.containers)
if tt.after != nil {
tt.after()
}
if (err != nil) != tt.wantErr {
t.Errorf("ListContainerStats() error = %v, wantErr %v", err, tt.wantErr)
return
}
var got *runtime.ListContainerStatsResponse
if err == nil {
got = c.toCRIContainerStats(css)
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ListContainerStats() = %v, want %v", got, tt.want)
}
Expand Down
36 changes: 8 additions & 28 deletions internal/cri/server/sandbox_stats_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,40 +84,20 @@ func (c *criService) podSandboxStats(
}
}

listContainerStatsRequest := &runtime.ListContainerStatsRequest{Filter: &runtime.ContainerStatsFilter{PodSandboxId: meta.ID}}
css, err := c.listContainerStats(ctx, listContainerStatsRequest)
if err != nil {
return nil, fmt.Errorf("failed to obtain container stats during podSandboxStats call: %w", err)
}
var pidCount uint64
for _, cntr := range c.containerStore.List() {
if cntr.SandboxID != sandbox.ID {
continue
}

state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
continue
}

task, err := cntr.Container.Task(ctx, nil)
if err != nil {
return nil, err
}

processes, err := task.Pids(ctx)
if err != nil {
return nil, err
}
pidCount += uint64(len(processes))

for _, cs := range css {
pidCount += cs.pids
podSandboxStats.Linux.Containers = append(podSandboxStats.Linux.Containers, cs.stats)
}
podSandboxStats.Linux.Process = &runtime.ProcessUsage{
Timestamp: timestamp.UnixNano(),
ProcessCount: &runtime.UInt64Value{Value: pidCount},
}

listContainerStatsRequest := &runtime.ListContainerStatsRequest{Filter: &runtime.ContainerStatsFilter{PodSandboxId: meta.ID}}
resp, err := c.ListContainerStats(ctx, listContainerStatsRequest)
if err != nil {
return nil, fmt.Errorf("failed to obtain container stats during podSandboxStats call: %w", err)
}
podSandboxStats.Linux.Containers = resp.GetStats()
}

return podSandboxStats, nil
Expand Down

0 comments on commit 1fb1882

Please sign in to comment.