Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spilled data size in CLI and WebUI #426

Open
wants to merge 4 commits into
base: cw/spill-space-manager/2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void printFinalInfo()
reprintLine(perNodeSummary);

out.println(String.format("Parallelism: %.1f", parallelism));
out.println("Spilled: " + stats.getSpilledDataSize());
}

// 0:32 [2.12GB, 15M rows] [67MB/s, 463K rows/s]
Expand Down Expand Up @@ -280,6 +281,7 @@ private void printQueryInfo(QueryResults results)
reprintLine(perNodeSummary);

reprintLine(String.format("Parallelism: %.1f", parallelism));
reprintLine("Spilled: " + stats.getSpilledDataSize());
}

assert terminalWidth >= 75;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.DataSize;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand All @@ -40,6 +41,7 @@ public class StatementStats
private final long processedRows;
private final long processedBytes;
private final StageStats rootStage;
private final DataSize spilledDataSize;

@JsonCreator
public StatementStats(
Expand All @@ -56,7 +58,8 @@ public StatementStats(
@JsonProperty("wallTimeMillis") long wallTimeMillis,
@JsonProperty("processedRows") long processedRows,
@JsonProperty("processedBytes") long processedBytes,
@JsonProperty("rootStage") StageStats rootStage)
@JsonProperty("rootStage") StageStats rootStage,
@JsonProperty("spilledDataSize") DataSize spilledDataSize)
{
this.state = requireNonNull(state, "state is null");
this.queued = queued;
Expand All @@ -72,6 +75,7 @@ public StatementStats(
this.processedRows = processedRows;
this.processedBytes = processedBytes;
this.rootStage = rootStage;
this.spilledDataSize = spilledDataSize;
}

@NotNull
Expand Down Expand Up @@ -160,6 +164,12 @@ public StageStats getRootStage()
return rootStage;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}

@Override
public String toString()
{
Expand All @@ -178,6 +188,7 @@ public String toString()
.add("processedRows", processedRows)
.add("processedBytes", processedBytes)
.add("rootStage", rootStage)
.add("spilledDataSize", spilledDataSize)
.toString();
}

Expand All @@ -202,6 +213,7 @@ public static class Builder
private long processedRows;
private long processedBytes;
private StageStats rootStage;
private DataSize spilledDataSize;

private Builder() {}

Expand Down Expand Up @@ -289,6 +301,12 @@ public Builder setRootStage(StageStats rootStage)
return this;
}

public Builder setSpilledDataSize(DataSize spilledDataSize)
{
this.spilledDataSize = spilledDataSize;
return this;
}

public StatementStats build()
{
return new StatementStats(
Expand All @@ -305,7 +323,8 @@ public StatementStats build()
wallTimeMillis,
processedRows,
processedBytes,
rootStage);
rootStage,
spilledDataSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private static String newQueryResults(Integer partialCancelId, Integer nextUriId
nextUriId == null ? null : URI.create(format(NEXT_URI, nextUriId)),
responseColumns,
data,
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, null),
null,
null,
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ public QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
boolean fullyBlocked = rootStage.isPresent();
Set<BlockedReason> blockedReasons = new HashSet<>();

long spilledBytes = 0;

boolean completeInfo = true;
for (StageInfo stageInfo : getAllStages(rootStage)) {
StageStats stageStats = stageInfo.getStageStats();
Expand Down Expand Up @@ -337,6 +339,7 @@ public QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
processedInputPositions += stageStats.getProcessedInputPositions();
}
completeInfo = completeInfo && stageInfo.isCompleteInfo();
spilledBytes += stageStats.getSpilledDataSize().toBytes();
}

if (rootStage.isPresent()) {
Expand Down Expand Up @@ -382,7 +385,8 @@ public QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
succinctBytes(processedInputDataSize),
processedInputPositions,
succinctBytes(outputDataSize),
outputPositions);
outputPositions,
succinctBytes(spilledBytes));

return new QueryInfo(queryId,
session.toSessionRepresentation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class QueryStats
private final DataSize outputDataSize;
private final long outputPositions;

private final DataSize spilledDataSize;

@VisibleForTesting
public QueryStats()
{
Expand Down Expand Up @@ -108,6 +110,7 @@ public QueryStats()
this.processedInputPositions = 0;
this.outputDataSize = null;
this.outputPositions = 0;
this.spilledDataSize = null;
}

@JsonCreator
Expand Down Expand Up @@ -151,7 +154,8 @@ public QueryStats(
@JsonProperty("processedInputPositions") long processedInputPositions,

@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions)
@JsonProperty("outputPositions") long outputPositions,
@JsonProperty("spilledDataSize") DataSize spilledDataSize)
{
this.createTime = requireNonNull(createTime, "createTime is null");
this.executionStartTime = executionStartTime;
Expand Down Expand Up @@ -202,6 +206,8 @@ public QueryStats(
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;

this.spilledDataSize = spilledDataSize;
}

@JsonProperty
Expand Down Expand Up @@ -405,4 +411,10 @@ public long getOutputPositions()
{
return outputPositions;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su
boolean fullyBlocked = true;
Set<BlockedReason> blockedReasons = new HashSet<>();

long spilledBytes = 0;

for (TaskInfo taskInfo : taskInfos) {
TaskState taskState = taskInfo.getTaskStatus().getState();
if (taskState.isDone()) {
Expand Down Expand Up @@ -259,6 +261,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su

outputDataSize += taskStats.getOutputDataSize().toBytes();
outputPositions += taskStats.getOutputPositions();
spilledBytes += taskStats.getSpilledDataSize().toBytes();
}

StageStats stageStats = new StageStats(
Expand Down Expand Up @@ -291,7 +294,8 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier, Su
succinctBytes(processedInputDataSize),
processedInputPositions,
succinctBytes(outputDataSize),
outputPositions);
outputPositions,
succinctBytes(spilledBytes));

ExecutionFailureInfo failureInfo = null;
if (state == FAILED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class StageStats
private final DataSize outputDataSize;
private final long outputPositions;

private final DataSize spilledDataSize;

@VisibleForTesting
public StageStats()
{
Expand Down Expand Up @@ -97,6 +99,7 @@ public StageStats()
this.processedInputPositions = 0;
this.outputDataSize = null;
this.outputPositions = 0;
this.spilledDataSize = null;
}

@JsonCreator
Expand Down Expand Up @@ -134,7 +137,8 @@ public StageStats(
@JsonProperty("processedInputPositions") long processedInputPositions,

@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions)
@JsonProperty("outputPositions") long outputPositions,
@JsonProperty("spilledDataSize") DataSize spilledDataSize)
{
this.schedulingComplete = schedulingComplete;
this.getSplitDistribution = requireNonNull(getSplitDistribution, "getSplitDistribution is null");
Expand Down Expand Up @@ -179,6 +183,8 @@ public StageStats(
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;

this.spilledDataSize = spilledDataSize;
}

@JsonProperty
Expand Down Expand Up @@ -336,4 +342,10 @@ public long getOutputPositions()
{
return outputPositions;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class DriverContext

private final List<OperatorContext> operatorContexts = new CopyOnWriteArrayList<>();
private final boolean partitioned;
private final AtomicLong spilledBytes = new AtomicLong();

public DriverContext(PipelineContext pipelineContext, Executor executor, boolean partitioned)
{
Expand Down Expand Up @@ -207,6 +208,7 @@ public ListenableFuture<?> reserveSystemMemory(long bytes)

public ListenableFuture<?> reserveSpill(long bytes)
{
spilledBytes.getAndAdd(bytes);
return pipelineContext.reserveSpill(bytes);
}

Expand Down Expand Up @@ -404,7 +406,8 @@ public DriverStats getDriverStats()
processedInputPositions,
outputDataSize.convertToMostSuccinctDataSize(),
outputPositions,
ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats)));
ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats)),
succinctBytes(spilledBytes.get()));
}

public boolean isPartitioned()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class DriverStats

private final List<OperatorStats> operatorStats;

private final DataSize spilledDataSize;

public DriverStats()
{
this.createTime = DateTime.now();
Expand Down Expand Up @@ -93,6 +95,8 @@ public DriverStats()
this.outputPositions = 0;

this.operatorStats = ImmutableList.of();

this.spilledDataSize = new DataSize(0, BYTE);
}

@JsonCreator
Expand Down Expand Up @@ -123,7 +127,9 @@ public DriverStats(
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions,

@JsonProperty("operatorStats") List<OperatorStats> operatorStats)
@JsonProperty("operatorStats") List<OperatorStats> operatorStats,

@JsonProperty("spilledDataSize") DataSize spilledDataSize)
{
this.createTime = requireNonNull(createTime, "createTime is null");
this.startTime = startTime;
Expand Down Expand Up @@ -155,6 +161,8 @@ public DriverStats(
this.outputPositions = outputPositions;

this.operatorStats = ImmutableList.copyOf(requireNonNull(operatorStats, "operatorStats is null"));

this.spilledDataSize = spilledDataSize;
}

@JsonProperty
Expand Down Expand Up @@ -284,4 +292,10 @@ public List<OperatorStats> getOperatorStats()
{
return operatorStats;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,10 @@ public OperatorStats getOperatorStats()
succinctBytes(memoryReservation.get()),
succinctBytes(systemMemoryContext.getReservedBytes()),
memoryFuture.get().isDone() ? Optional.empty() : Optional.of(WAITING_FOR_MEMORY),
info);

info,

succinctBytes(spillContext.getSpilledBytes()));
}

private long currentThreadUserTime()
Expand Down Expand Up @@ -502,6 +505,7 @@ private class OperatorSpillContext
private final DriverContext driverContext;

private long reservedBytes;
private long spilledBytes;

public OperatorSpillContext(DriverContext driverContext)
{
Expand All @@ -513,6 +517,7 @@ public void updateBytes(long bytes)
{
if (bytes > 0) {
driverContext.reserveSpill(bytes);
spilledBytes += bytes;
}
else {
checkArgument(reservedBytes + bytes >= 0, "tried to free %s spilled bytes from %s bytes reserved", -bytes, reservedBytes);
Expand All @@ -526,7 +531,13 @@ public String toString()
{
return toStringHelper(this)
.add("usedBytes", reservedBytes)
.add("spilledBytes", spilledBytes)
.toString();
}

public long getSpilledBytes()
{
return spilledBytes;
}
}
}
Loading