From d430dca4acfc16b8f16368d38644cba24269e306 Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Tue, 15 Nov 2016 19:26:27 +0100 Subject: [PATCH 1/4] add spilled data size to JSON Add tracking of spilled data size on various levels (from operator to query) and expose it in JSON. --- .../presto/jdbc/TestProgressMonitor.java | 2 +- .../presto/execution/QueryStateMachine.java | 6 ++++- .../facebook/presto/execution/QueryStats.java | 14 +++++++++++- .../presto/execution/StageStateMachine.java | 6 ++++- .../facebook/presto/execution/StageStats.java | 14 +++++++++++- .../presto/operator/DriverContext.java | 5 ++++- .../facebook/presto/operator/DriverStats.java | 16 +++++++++++++- .../presto/operator/OperatorContext.java | 13 ++++++++++- .../presto/operator/OperatorStats.java | 22 ++++++++++++++++--- .../presto/operator/PipelineContext.java | 6 ++++- .../presto/operator/PipelineStats.java | 16 ++++++++++++-- .../facebook/presto/operator/TaskContext.java | 6 ++++- .../facebook/presto/operator/TaskStats.java | 20 ++++++++++++++--- .../presto/execution/TestQueryStats.java | 5 ++++- .../presto/execution/TestStageStats.java | 4 +++- .../presto/operator/TestDriverStats.java | 4 +++- .../presto/operator/TestOperatorStats.java | 9 ++++++-- .../presto/operator/TestPipelineStats.java | 4 +++- .../presto/operator/TestTaskStats.java | 4 +++- .../presto/server/TestBasicQueryInfo.java | 3 ++- 20 files changed, 153 insertions(+), 26 deletions(-) diff --git a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java index 66c21737a46f..e012ba5fd433 100644 --- a/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java +++ b/presto-jdbc/src/test/java/com/facebook/presto/jdbc/TestProgressMonitor.java @@ -75,7 +75,7 @@ private static String newQueryResults(Integer partialCancelId, Integer nextUriId nextUriId == null ? null : URI.create(format(NEXT_URI, nextUriId)), responseColumns, data, - new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null), + new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null, null), null, null, null); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index f7381354a621..e58725900649 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -303,6 +303,8 @@ public QueryInfo getQueryInfo(Optional rootStage) boolean fullyBlocked = rootStage.isPresent(); Set blockedReasons = new HashSet<>(); + long spilledBytes = 0; + boolean completeInfo = true; for (StageInfo stageInfo : getAllStages(rootStage)) { StageStats stageStats = stageInfo.getStageStats(); @@ -337,6 +339,7 @@ public QueryInfo getQueryInfo(Optional rootStage) processedInputPositions += stageStats.getProcessedInputPositions(); } completeInfo = completeInfo && stageInfo.isCompleteInfo(); + spilledBytes += stageStats.getSpilledDataSize().toBytes(); } if (rootStage.isPresent()) { @@ -382,7 +385,8 @@ public QueryInfo getQueryInfo(Optional rootStage) succinctBytes(processedInputDataSize), processedInputPositions, succinctBytes(outputDataSize), - outputPositions); + outputPositions, + succinctBytes(spilledBytes)); return new QueryInfo(queryId, session.toSessionRepresentation(), diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java index 7fe73b25d3a0..e256c99fd297 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java @@ -73,6 +73,8 @@ public class QueryStats private final DataSize outputDataSize; private final long outputPositions; + private final DataSize spilledDataSize; + @VisibleForTesting public QueryStats() { @@ -108,6 +110,7 @@ public QueryStats() this.processedInputPositions = 0; this.outputDataSize = null; this.outputPositions = 0; + this.spilledDataSize = null; } @JsonCreator @@ -151,7 +154,8 @@ public QueryStats( @JsonProperty("processedInputPositions") long processedInputPositions, @JsonProperty("outputDataSize") DataSize outputDataSize, - @JsonProperty("outputPositions") long outputPositions) + @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.executionStartTime = executionStartTime; @@ -202,6 +206,8 @@ public QueryStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -405,4 +411,10 @@ public long getOutputPositions() { return outputPositions; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java index 9198c642a9cd..1fe5072eee25 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageStateMachine.java @@ -223,6 +223,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su boolean fullyBlocked = true; Set blockedReasons = new HashSet<>(); + long spilledBytes = 0; + for (TaskInfo taskInfo : taskInfos) { TaskState taskState = taskInfo.getTaskStatus().getState(); if (taskState.isDone()) { @@ -259,6 +261,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su outputDataSize += taskStats.getOutputDataSize().toBytes(); outputPositions += taskStats.getOutputPositions(); + spilledBytes += taskStats.getSpilledDataSize().toBytes(); } StageStats stageStats = new StageStats( @@ -291,7 +294,8 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier, Su succinctBytes(processedInputDataSize), processedInputPositions, succinctBytes(outputDataSize), - outputPositions); + outputPositions, + succinctBytes(spilledBytes)); ExecutionFailureInfo failureInfo = null; if (state == FAILED) { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java b/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java index a89a9b036ef3..63320947d2db 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/StageStats.java @@ -68,6 +68,8 @@ public class StageStats private final DataSize outputDataSize; private final long outputPositions; + private final DataSize spilledDataSize; + @VisibleForTesting public StageStats() { @@ -97,6 +99,7 @@ public StageStats() this.processedInputPositions = 0; this.outputDataSize = null; this.outputPositions = 0; + this.spilledDataSize = null; } @JsonCreator @@ -134,7 +137,8 @@ public StageStats( @JsonProperty("processedInputPositions") long processedInputPositions, @JsonProperty("outputDataSize") DataSize outputDataSize, - @JsonProperty("outputPositions") long outputPositions) + @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.schedulingComplete = schedulingComplete; this.getSplitDistribution = requireNonNull(getSplitDistribution, "getSplitDistribution is null"); @@ -179,6 +183,8 @@ public StageStats( this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null"); checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -336,4 +342,10 @@ public long getOutputPositions() { return outputPositions; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java b/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java index cd6eef676fa0..001d8878d548 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/DriverContext.java @@ -82,6 +82,7 @@ public class DriverContext private final List operatorContexts = new CopyOnWriteArrayList<>(); private final boolean partitioned; + private final AtomicLong spilledBytes = new AtomicLong(); public DriverContext(PipelineContext pipelineContext, Executor executor, boolean partitioned) { @@ -207,6 +208,7 @@ public ListenableFuture reserveSystemMemory(long bytes) public ListenableFuture reserveSpill(long bytes) { + spilledBytes.getAndAdd(bytes); return pipelineContext.reserveSpill(bytes); } @@ -404,7 +406,8 @@ public DriverStats getDriverStats() processedInputPositions, outputDataSize.convertToMostSuccinctDataSize(), outputPositions, - ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats))); + ImmutableList.copyOf(transform(operatorContexts, OperatorContext::getOperatorStats)), + succinctBytes(spilledBytes.get())); } public boolean isPartitioned() diff --git a/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java b/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java index 55aa29f1fbf8..74232116b230 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/DriverStats.java @@ -64,6 +64,8 @@ public class DriverStats private final List operatorStats; + private final DataSize spilledDataSize; + public DriverStats() { this.createTime = DateTime.now(); @@ -93,6 +95,8 @@ public DriverStats() this.outputPositions = 0; this.operatorStats = ImmutableList.of(); + + this.spilledDataSize = new DataSize(0, BYTE); } @JsonCreator @@ -123,7 +127,9 @@ public DriverStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, - @JsonProperty("operatorStats") List operatorStats) + @JsonProperty("operatorStats") List operatorStats, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.startTime = startTime; @@ -155,6 +161,8 @@ public DriverStats( this.outputPositions = outputPositions; this.operatorStats = ImmutableList.copyOf(requireNonNull(operatorStats, "operatorStats is null")); + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -284,4 +292,10 @@ public List getOperatorStats() { return operatorStats; } + + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java index c0ff761abd17..1d207309c47a 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorContext.java @@ -395,7 +395,10 @@ public OperatorStats getOperatorStats() succinctBytes(memoryReservation.get()), succinctBytes(systemMemoryContext.getReservedBytes()), memoryFuture.get().isDone() ? Optional.empty() : Optional.of(WAITING_FOR_MEMORY), - info); + + info, + + succinctBytes(spillContext.getSpilledBytes())); } private long currentThreadUserTime() @@ -502,6 +505,7 @@ private class OperatorSpillContext private final DriverContext driverContext; private long reservedBytes; + private long spilledBytes; public OperatorSpillContext(DriverContext driverContext) { @@ -513,6 +517,7 @@ public void updateBytes(long bytes) { if (bytes > 0) { driverContext.reserveSpill(bytes); + spilledBytes += bytes; } else { checkArgument(reservedBytes + bytes >= 0, "tried to free %s spilled bytes from %s bytes reserved", -bytes, reservedBytes); @@ -526,7 +531,13 @@ public String toString() { return toStringHelper(this) .add("usedBytes", reservedBytes) + .add("spilledBytes", spilledBytes) .toString(); } + + public long getSpilledBytes() + { + return spilledBytes; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java index 64d21a4cc6d4..ef6375c03076 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/OperatorStats.java @@ -62,6 +62,8 @@ public class OperatorStats private final DataSize systemMemoryReservation; private final Optional blockedReason; + private final DataSize spilledDataSize; + private final Object info; @JsonCreator @@ -95,7 +97,9 @@ public OperatorStats( @JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation, @JsonProperty("blockedReason") Optional blockedReason, - @JsonProperty("info") Object info) + @JsonProperty("info") Object info, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { checkArgument(operatorId >= 0, "operatorId is negative"); this.operatorId = operatorId; @@ -129,6 +133,8 @@ public OperatorStats( this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null"); this.blockedReason = blockedReason; + this.spilledDataSize = spilledDataSize; + this.info = info; } @@ -270,6 +276,12 @@ public Optional getBlockedReason() return blockedReason; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @Nullable @JsonProperty public Object getInfo() @@ -308,7 +320,7 @@ public OperatorStats add(Iterable operators) long memoryReservation = this.memoryReservation.toBytes(); long systemMemoryReservation = this.systemMemoryReservation.toBytes(); Optional blockedReason = this.blockedReason; - + long spilledBytes = this.spilledDataSize.toBytes(); Mergeable base = null; if (info instanceof Mergeable) { base = (Mergeable) info; @@ -343,6 +355,8 @@ public OperatorStats add(Iterable operators) blockedReason = operator.getBlockedReason(); } + spilledBytes += operator.getSpilledDataSize().toBytes(); + Object info = operator.getInfo(); if (base != null && info != null && base.getClass() == info.getClass()) { base = mergeInfo(base, info); @@ -379,7 +393,9 @@ public OperatorStats add(Iterable operators) succinctBytes(systemMemoryReservation), blockedReason, - base); + base, + + succinctBytes(spilledBytes)); } public static > Mergeable mergeInfo(Object base, Object other) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java index 53505b97a45b..39cb0b6314c8 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PipelineContext.java @@ -84,6 +84,7 @@ public class PipelineContext private final CounterStat outputPositions = new CounterStat(); private final ConcurrentMap operatorSummaries = new ConcurrentHashMap<>(); + private final AtomicLong spilledBytes = new AtomicLong(); public PipelineContext(TaskContext taskContext, Executor executor, boolean inputPipeline, boolean outputPipeline) { @@ -225,6 +226,7 @@ public synchronized ListenableFuture reserveSystemMemory(long bytes) public synchronized ListenableFuture reserveSpill(long bytes) { + spilledBytes.getAndAdd(bytes); return taskContext.reserveSpill(bytes); } @@ -451,7 +453,9 @@ public PipelineStats getPipelineStats() outputPositions, ImmutableList.copyOf(operatorSummaries.values()), - drivers); + drivers, + + succinctBytes(spilledBytes.get())); } private static boolean compareAndSet(ConcurrentMap map, K key, V oldValue, V newValue) diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java b/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java index 768ddd5aac1f..6229a46082fa 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PipelineStats.java @@ -73,6 +73,8 @@ public class PipelineStats private final List operatorSummaries; private final List drivers; + private final DataSize spilledDataSize; + @JsonCreator public PipelineStats( @JsonProperty("firstStartTime") DateTime firstStartTime, @@ -112,7 +114,8 @@ public PipelineStats( @JsonProperty("outputPositions") long outputPositions, @JsonProperty("operatorSummaries") List operatorSummaries, - @JsonProperty("drivers") List drivers) + @JsonProperty("drivers") List drivers, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.firstStartTime = firstStartTime; this.lastStartTime = lastStartTime; @@ -161,6 +164,8 @@ public PipelineStats( this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null")); this.drivers = ImmutableList.copyOf(requireNonNull(drivers, "drivers is null")); + + this.spilledDataSize = spilledDataSize; } @Nullable @@ -340,6 +345,12 @@ public List getDrivers() return drivers; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public PipelineStats summarize() { return new PipelineStats( @@ -371,6 +382,7 @@ public PipelineStats summarize() outputDataSize, outputPositions, operatorSummaries, - ImmutableList.of()); + ImmutableList.of(), + spilledDataSize); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java index d2e00c7285d1..71cad9388be8 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskContext.java @@ -78,6 +78,8 @@ public class TaskContext @GuardedBy("cumulativeMemoryLock") private long lastTaskStatCallNanos = 0; + private final AtomicLong spilledBytes = new AtomicLong(); + public TaskContext(QueryContext queryContext, TaskStateMachine taskStateMachine, Executor executor, @@ -167,6 +169,7 @@ public synchronized ListenableFuture reserveSystemMemory(long bytes) public synchronized ListenableFuture reserveSpill(long bytes) { checkArgument(bytes >= 0, "bytes is negative"); + spilledBytes.getAndAdd(bytes); return queryContext.reserveSpill(bytes); } @@ -390,6 +393,7 @@ public TaskStats getTaskStats() processedInputPositions, succinctBytes(outputDataSize), outputPositions, - pipelineStats); + pipelineStats, + succinctBytes(spilledBytes.get())); } } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java index 15373ff37ad9..b1c690b6fed8 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TaskStats.java @@ -71,6 +71,8 @@ public class TaskStats private final List pipelines; + private final DataSize spilledDataSize; + public TaskStats(DateTime createTime, DateTime endTime) { this(createTime, @@ -101,7 +103,8 @@ public TaskStats(DateTime createTime, DateTime endTime) 0, new DataSize(0, BYTE), 0, - ImmutableList.of()); + ImmutableList.of(), + new DataSize(0, BYTE)); } @JsonCreator @@ -141,7 +144,9 @@ public TaskStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, - @JsonProperty("pipelines") List pipelines) + @JsonProperty("pipelines") List pipelines, + + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.createTime = requireNonNull(createTime, "createTime is null"); this.firstStartTime = firstStartTime; @@ -190,6 +195,8 @@ public TaskStats( this.outputPositions = outputPositions; this.pipelines = ImmutableList.copyOf(requireNonNull(pipelines, "pipelines is null")); + + this.spilledDataSize = spilledDataSize; } @JsonProperty @@ -370,6 +377,12 @@ public int getRunningPartitionedDrivers() return runningPartitionedDrivers; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public TaskStats summarize() { return new TaskStats( @@ -401,6 +414,7 @@ public TaskStats summarize() processedInputPositions, outputDataSize, outputPositions, - ImmutableList.of()); + ImmutableList.of(), + spilledDataSize); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java index 7a09834abedf..ad4fe9c2c699 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java @@ -67,7 +67,8 @@ public class TestQueryStats 27, new DataSize(28, BYTE), - 29); + 29, + new DataSize(30, BYTE)); @Test public void testJson() @@ -122,5 +123,7 @@ public static void assertExpectedQueryStats(QueryStats actual) assertEquals(actual.getOutputDataSize(), new DataSize(28, BYTE)); assertEquals(actual.getOutputPositions(), 29); + + assertEquals(actual.getSpilledDataSize(), new DataSize(30, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java b/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java index e32bf12cfbbe..02c781223dbb 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestStageStats.java @@ -62,7 +62,8 @@ public class TestStageStats 22, new DataSize(23, BYTE), - 24); + 24, + new DataSize(25, BYTE)); @Test public void testJson() @@ -109,6 +110,7 @@ public static void assertExpectedStageStats(StageStats actual) assertEquals(actual.getOutputDataSize(), new DataSize(23, BYTE)); assertEquals(actual.getOutputPositions(), 24); + assertEquals(actual.getSpilledDataSize(), new DataSize(25, BYTE)); } private static DistributionSnapshot getTestDistribution(int count) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java index bbf0af793137..ab41c89af32b 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestDriverStats.java @@ -57,7 +57,8 @@ public class TestDriverStats new DataSize(17, BYTE), 18, - ImmutableList.of(TestOperatorStats.EXPECTED)); + ImmutableList.of(TestOperatorStats.EXPECTED), + new DataSize(19, BYTE)); @Test public void testJson() @@ -98,5 +99,6 @@ public static void assertExpectedDriverStats(DriverStats actual) assertEquals(actual.getOperatorStats().size(), 1); assertExpectedOperatorStats(actual.getOperatorStats().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(19, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java index 17280e57ee3a..bb49045d17e9 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestOperatorStats.java @@ -58,7 +58,8 @@ public class TestOperatorStats new DataSize(18, BYTE), new DataSize(19, BYTE), Optional.empty(), - "20"); + "20", + new DataSize(21, BYTE)); public static final OperatorStats MERGEABLE = new OperatorStats( 41, @@ -89,7 +90,8 @@ public class TestOperatorStats new DataSize(18, BYTE), new DataSize(19, BYTE), Optional.empty(), - new LongMergeable(20)); + new LongMergeable(20), + new DataSize(21, BYTE)); @Test public void testJson() @@ -131,6 +133,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual) assertEquals(actual.getMemoryReservation(), new DataSize(18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(19, BYTE)); assertEquals(actual.getInfo(), "20"); + assertEquals(actual.getSpilledDataSize(), new DataSize(21, BYTE)); } @Test @@ -164,6 +167,7 @@ public void testAdd() assertEquals(actual.getMemoryReservation(), new DataSize(3 * 18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(3 * 19, BYTE)); assertEquals(actual.getInfo(), null); + assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 21, BYTE)); } @Test @@ -197,6 +201,7 @@ public void testAddMergeable() assertEquals(actual.getMemoryReservation(), new DataSize(3 * 18, BYTE)); assertEquals(actual.getSystemMemoryReservation(), new DataSize(3 * 19, BYTE)); assertEquals(actual.getInfo(), new LongMergeable(20 * 3)); + assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 21, BYTE)); } private static class LongMergeable diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java index e03bd7dfd32e..650c54740c62 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestPipelineStats.java @@ -70,7 +70,8 @@ public class TestPipelineStats 18, ImmutableList.of(TestOperatorStats.EXPECTED), - ImmutableList.of(TestDriverStats.EXPECTED)); + ImmutableList.of(TestDriverStats.EXPECTED), + new DataSize(19, BYTE)); @Test public void testJson() @@ -123,6 +124,7 @@ public static void assertExpectedPipelineStats(PipelineStats actual) assertEquals(actual.getDrivers().size(), 1); assertExpectedDriverStats(actual.getDrivers().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(19, BYTE)); } private static DistributionSnapshot getTestDistribution(int count) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java index 70de54aa7dbd..d7fe128f780d 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestTaskStats.java @@ -64,7 +64,8 @@ public class TestTaskStats new DataSize(22, BYTE), 23, - ImmutableList.of(TestPipelineStats.EXPECTED)); + ImmutableList.of(TestPipelineStats.EXPECTED), + new DataSize(24, BYTE)); @Test public void testJson() @@ -114,5 +115,6 @@ public static void assertExpectedTaskStats(TaskStats actual) assertEquals(actual.getPipelines().size(), 1); assertExpectedPipelineStats(actual.getPipelines().get(0)); + assertEquals(actual.getSpilledDataSize(), new DataSize(24, BYTE)); } } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java index 3be861c09c0e..0461c9ca6b3e 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java @@ -82,7 +82,8 @@ public void testConstructor() DataSize.valueOf("29GB"), 30, DataSize.valueOf("31GB"), - 32), + 32, + DataSize.valueOf("33GB")), ImmutableMap.of(), ImmutableSet.of(), ImmutableMap.of(), From 7f42b13d38fbafad77cf877a64c0d8a794b29700 Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 10:08:51 +0100 Subject: [PATCH 2/4] add spilled data size to EXPLAIN ANALYZE Data size added to EXPLAIN ANALYZE output, at Fragment and Operator levels. This information is added to the "Cost" line and displayed only if the value is non-zero. --- .../presto/sql/planner/PlanPrinter.java | 44 +++++++++++++++---- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java index f38c3974b728..a4daee6c0d7a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanPrinter.java @@ -207,11 +207,13 @@ private static List getPlanNodeStats(TaskStats taskStats) // are collected from the leaf stages. Map outputPositions = new HashMap<>(); Map outputBytes = new HashMap<>(); + Map spilledBytes = new HashMap<>(); Map wallMillis = new HashMap<>(); for (PipelineStats pipelineStats : taskStats.getPipelines()) { Map pipelineOutputPositions = new HashMap<>(); Map pipelineOutputBytes = new HashMap<>(); + Map pipelineSpilledBytes = new HashMap<>(); List operatorSummaries = pipelineStats.getOperatorSummaries(); for (int i = 0; i < operatorSummaries.size(); i++) { @@ -230,25 +232,27 @@ private static List getPlanNodeStats(TaskStats taskStats) pipelineOutputPositions.put(planNodeId, operatorStats.getOutputPositions()); pipelineOutputBytes.put(planNodeId, operatorStats.getOutputDataSize().toBytes()); } + pipelineSpilledBytes.put(planNodeId, operatorStats.getSpilledDataSize().toBytes()); } for (Map.Entry entry : pipelineOutputPositions.entrySet()) { outputBytes.merge(entry.getKey(), pipelineOutputBytes.get(entry.getKey()), Long::sum); outputPositions.merge(entry.getKey(), entry.getValue(), Long::sum); + spilledBytes.merge(entry.getKey(), pipelineSpilledBytes.get(entry.getKey()), Long::sum); } } List stats = new ArrayList<>(); for (Map.Entry entry : wallMillis.entrySet()) { if (outputPositions.containsKey(entry.getKey())) { - stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), outputPositions.get(entry.getKey()), succinctDataSize(outputBytes.get(entry.getKey()), BYTE))); + stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), succinctBytes(spilledBytes.get(entry.getKey())), outputPositions.get(entry.getKey()), succinctDataSize(outputBytes.get(entry.getKey()), BYTE))); } else { // It's possible there will be no output stats because all the pipelines that we observed were non-output. // For example in a query like SELECT * FROM a JOIN b ON c = d LIMIT 1 // It's possible to observe stats after the build starts, but before the probe does // and therefore only have wall time, but no output stats - stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS))); + stats.add(new PlanNodeStats(entry.getKey(), new Duration(entry.getValue(), MILLISECONDS), succinctBytes(spilledBytes.get(entry.getKey())))); } } return stats; @@ -273,12 +277,16 @@ private static String formatFragment(Metadata metadata, Session session, PlanFra if (stageStats.isPresent()) { builder.append(indentString(1)) - .append(format("Cost: CPU %s, Input %d (%s), Output %d (%s)\n", + .append(format("Cost: CPU %s, Input %d (%s), Output %d (%s)", stageStats.get().getTotalCpuTime(), stageStats.get().getProcessedInputPositions(), stageStats.get().getProcessedInputDataSize(), stageStats.get().getOutputPositions(), stageStats.get().getOutputDataSize())); + if (isNonZero(stageStats.get().getSpilledDataSize())) { + builder.append(", Spilled " + stageStats.get().getSpilledDataSize()); + } + builder.append("\n"); } PartitioningScheme partitioningScheme = fragment.getPartitioningScheme(); @@ -323,6 +331,11 @@ private static String formatFragment(Metadata metadata, Session session, PlanFra return builder.toString(); } + private static boolean isNonZero(DataSize dataSize) + { + return dataSize != null && dataSize.getValue() != 0; + } + public static String graphvizLogicalPlan(PlanNode plan, Map types) { PlanFragment fragment = new PlanFragment( @@ -384,7 +397,11 @@ private void printStats(int indent, PlanNodeId planNodeId) outputString = "unknown"; } output.append(indentString(indent)) - .append(format("Cost: %s, Output: %s\n", fractionString, outputString)); + .append(format("Cost: %s, Output: %s", fractionString, outputString)); + if (isNonZero(stats.getSpilledDataSize())) { + output.append(", Spilled: " + stats.getSpilledDataSize()); + } + output.append("\n"); } private static String indentString(int indent) @@ -1061,23 +1078,26 @@ private static class PlanNodeStats private final Duration wallTime; private final Optional outputPositions; private final Optional outputDataSize; + private final DataSize spilledDataSize; - public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime) + public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize) { - this(planNodeId, wallTime, Optional.empty(), Optional.empty()); + this(planNodeId, wallTime, spilledDataSize, Optional.empty(), Optional.empty()); + spilledDataSize = spilledDataSize; } - public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, long outputPositions, DataSize outputDataSize) + public PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize, long outputPositions, DataSize outputDataSize) { - this(planNodeId, wallTime, Optional.of(outputPositions), Optional.of(outputDataSize)); + this(planNodeId, wallTime, spilledDataSize, Optional.of(outputPositions), Optional.of(outputDataSize)); } - private PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, Optional outputPositions, Optional outputDataSize) + private PlanNodeStats(PlanNodeId planNodeId, Duration wallTime, DataSize spilledDataSize, Optional outputPositions, Optional outputDataSize) { this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); this.wallTime = requireNonNull(wallTime, "wallTime is null"); this.outputPositions = outputPositions; this.outputDataSize = outputDataSize; + this.spilledDataSize = spilledDataSize; } public PlanNodeId getPlanNodeId() @@ -1100,6 +1120,11 @@ public Optional getOutputDataSize() return outputDataSize; } + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + public static PlanNodeStats merge(PlanNodeStats planNodeStats1, PlanNodeStats planNodeStats2) { checkArgument(planNodeStats1.getPlanNodeId().equals(planNodeStats2.getPlanNodeId()), "planNodeIds do not match. %s != %s", planNodeStats1.getPlanNodeId(), planNodeStats2.getPlanNodeId()); @@ -1126,6 +1151,7 @@ else if (planNodeStats1.getOutputDataSize().isPresent()) { return new PlanNodeStats( planNodeStats1.getPlanNodeId(), new Duration(planNodeStats1.getWallTime().toMillis() + planNodeStats2.getWallTime().toMillis(), MILLISECONDS), + succinctBytes(planNodeStats1.getSpilledDataSize().toBytes() + planNodeStats2.getSpilledDataSize().toBytes()), outputPositions, outputDataSize); } From fb131de1c259a271889cefb33ca983a929aed57c Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 10:11:16 +0100 Subject: [PATCH 3/4] add spilled data size to query summary debug When CLI is started with --debug then spilled data size for the entire query is being displayed (both in the running total, and the final summary). The information is added in a separate line below "Parallelism". --- .../facebook/presto/cli/StatusPrinter.java | 2 ++ .../presto/client/StatementStats.java | 23 +++++++++++++++++-- .../presto/server/StatementResource.java | 1 + 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java b/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java index 09f1eaec94f9..05ed9758629b 100644 --- a/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java +++ b/presto-cli/src/main/java/com/facebook/presto/cli/StatusPrinter.java @@ -193,6 +193,7 @@ public void printFinalInfo() reprintLine(perNodeSummary); out.println(String.format("Parallelism: %.1f", parallelism)); + out.println("Spilled: " + stats.getSpilledDataSize()); } // 0:32 [2.12GB, 15M rows] [67MB/s, 463K rows/s] @@ -280,6 +281,7 @@ private void printQueryInfo(QueryResults results) reprintLine(perNodeSummary); reprintLine(String.format("Parallelism: %.1f", parallelism)); + reprintLine("Spilled: " + stats.getSpilledDataSize()); } assert terminalWidth >= 75; diff --git a/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java b/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java index b611f9bd6284..210bcfe4401d 100644 --- a/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java +++ b/presto-client/src/main/java/com/facebook/presto/client/StatementStats.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.airlift.units.DataSize; import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; @@ -40,6 +41,7 @@ public class StatementStats private final long processedRows; private final long processedBytes; private final StageStats rootStage; + private final DataSize spilledDataSize; @JsonCreator public StatementStats( @@ -56,7 +58,8 @@ public StatementStats( @JsonProperty("wallTimeMillis") long wallTimeMillis, @JsonProperty("processedRows") long processedRows, @JsonProperty("processedBytes") long processedBytes, - @JsonProperty("rootStage") StageStats rootStage) + @JsonProperty("rootStage") StageStats rootStage, + @JsonProperty("spilledDataSize") DataSize spilledDataSize) { this.state = requireNonNull(state, "state is null"); this.queued = queued; @@ -72,6 +75,7 @@ public StatementStats( this.processedRows = processedRows; this.processedBytes = processedBytes; this.rootStage = rootStage; + this.spilledDataSize = spilledDataSize; } @NotNull @@ -160,6 +164,12 @@ public StageStats getRootStage() return rootStage; } + @JsonProperty + public DataSize getSpilledDataSize() + { + return spilledDataSize; + } + @Override public String toString() { @@ -178,6 +188,7 @@ public String toString() .add("processedRows", processedRows) .add("processedBytes", processedBytes) .add("rootStage", rootStage) + .add("spilledDataSize", spilledDataSize) .toString(); } @@ -202,6 +213,7 @@ public static class Builder private long processedRows; private long processedBytes; private StageStats rootStage; + private DataSize spilledDataSize; private Builder() {} @@ -289,6 +301,12 @@ public Builder setRootStage(StageStats rootStage) return this; } + public Builder setSpilledDataSize(DataSize spilledDataSize) + { + this.spilledDataSize = spilledDataSize; + return this; + } + public StatementStats build() { return new StatementStats( @@ -305,7 +323,8 @@ public StatementStats build() wallTimeMillis, processedRows, processedBytes, - rootStage); + rootStage, + spilledDataSize); } } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java b/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java index a0cf0a13eaca..f3c2a0a52d7b 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/StatementResource.java @@ -615,6 +615,7 @@ private static StatementStats toStatementStats(QueryInfo queryInfo) .setProcessedRows(queryStats.getRawInputPositions()) .setProcessedBytes(queryStats.getRawInputDataSize().toBytes()) .setRootStage(toStageStats(outputStage)) + .setSpilledDataSize(queryStats.getSpilledDataSize()) .build(); } From 7c1d6d5b550cf16975ef12f4c5784b469e80a43a Mon Sep 17 00:00:00 2001 From: Wojciech Biela Date: Thu, 17 Nov 2016 14:43:35 +0100 Subject: [PATCH 4/4] add spilled data size to Web UI Added spilled data size to the Query Details page in the Resource Utilization Summary section and to the Live Plan page in the Web UI just below the "Splits" line (per stage). --- .../main/resources/com/facebook/presto/server/plan.html | 1 + presto-main/src/main/resources/webapp/assets/query.js | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/presto-main/src/main/resources/com/facebook/presto/server/plan.html b/presto-main/src/main/resources/com/facebook/presto/server/plan.html index 290175ffcc45..dc37ea7c8d41 100644 --- a/presto-main/src/main/resources/com/facebook/presto/server/plan.html +++ b/presto-main/src/main/resources/com/facebook/presto/server/plan.html @@ -181,6 +181,7 @@ ) + "
Memory: " + stats.totalMemoryReservation + "
" + "
Splits: Q:" + stats.queuedDrivers + ", R:" + stats.runningDrivers + ", F:" + stats.completedDrivers + "
" + + "
Spilled: " + stats.spilledDataSize + "
" + "
" + "
Input: " + stats.processedInputDataSize + " / " + formatCount(stats.processedInputPositions) + " rows
"; diff --git a/presto-main/src/main/resources/webapp/assets/query.js b/presto-main/src/main/resources/webapp/assets/query.js index 61548b83882d..645de789030c 100644 --- a/presto-main/src/main/resources/webapp/assets/query.js +++ b/presto-main/src/main/resources/webapp/assets/query.js @@ -1160,6 +1160,14 @@ var QueryDetail = React.createClass({ { formatDataSizeBytes(query.queryStats.cumulativeMemory / 1000.0, "") + " seconds" } + + + Spilled Data + + + { query.queryStats.spilledDataSize } + +