Skip to content

Commit

Permalink
[producers][server][dvc] Dedupe DIV debug info (#700)
Browse files Browse the repository at this point in the history
In some use cases, the debug info in servers could take a large amout of heap.
This commit optimizes this by deduping the instances used in the various debug
maps, since they are very repetitive, and by using a more compact map from the
fastutil library, rather than a CHM.

Miscellaneous:

- Deleted the ArrayBasedPrimitiveIntegerSet, which is unused.

- Added back the test permutation number to build logs.

- Added two commits containing just big directory moves to git-blame-ignore-revs
  • Loading branch information
FelixGV authored Oct 18, 2023
1 parent 70edf83 commit a88522a
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 224 deletions.
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
43b8eb5ca60b708c6bc86a6a0c92df2d657af2c1
690830eb1aae03017aa43627128b4860e60eeab1
bb6544918fc3105a834782eb3c170711d5ba206c
63a7921da1c1814f9d856a6375f84ba392c01abb
9bd0175869cd66376457bdecf9e00583cfb98301
14 changes: 2 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,6 @@ def parser = new XmlSlurper()
parser.setFeature("http://apache.org/xml/features/disallow-doctype-decl", false)
parser.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)

// We remove square brackets from test names, which occur when using a DataProvider,
// because occasionally the number in the brackets is non-deterministic (unknown why)
// and when that occurs, the test-retry plugin gets confused and cannot match the
// attempts together.
def removeSquareBrackets(String testName) {
return testName.replaceFirst('\\[[0-9]+\\]', '')
}

configurations {
alpnAgent {
}
Expand Down Expand Up @@ -431,16 +423,14 @@ subprojects {
}

beforeTest { descriptor ->
def testName = removeSquareBrackets(descriptor.displayName)
def out = services.get(StyledTextOutputFactory).create("an-ouput")

out.style(Style.Normal).println("$descriptor.className > $testName STARTED")
out.style(Style.Normal).println("$descriptor.className > $descriptor.displayName STARTED")
}

afterTest { descriptor, result ->
def totalTime = result.endTime - result.startTime
def prettyTime = totalTime < 1000 ? "$totalTime ms" : "${totalTime / 1000} s"
def testName = removeSquareBrackets(descriptor.displayName)
def out = services.get(StyledTextOutputFactory).create("an-ouput")

def style = result.resultType == TestResult.ResultType.SUCCESS
Expand All @@ -455,7 +445,7 @@ subprojects {
? 'FAILED '
: 'SKIPPED '

out.style(Style.Normal).text("$descriptor.className > $testName ")
out.style(Style.Normal).text("$descriptor.className > $descriptor.displayName ")
.style(style).text(status)
.style(Style.Normal).println("($prettyTime)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ private void updateOffsetRecord(GUID guid, Segment segment, OffsetRecord offsetR
* that). It is redundant that we store the same debug values once per partition. In the future,
* if we want to eliminate this redundancy, we could move the per-producer debug info to another
* data structure, though that would increase bookkeeping complexity. This is expected to be a
* minor overhead, and therefore it appears to be a premature to optimize this now.
* minor overhead, and therefore it appears to be premature to optimize this now.
*/
state.aggregates = segment.getAggregates();
state.debugInfo = segment.getDebugInfo();
state.aggregates = CollectionUtils.substituteEmptyMap(segment.getAggregates());
state.debugInfo = CollectionUtils.substituteEmptyMap(segment.getDebugInfo());
}
state.checksumType = segment.getCheckSumType().getValue();
/**
Expand Down
1 change: 0 additions & 1 deletion gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@
<Class name="com.linkedin.venice.client.store.schemas.TestValueRecordWithMoreFields"/>
<Class name="com.linkedin.venice.fastclient.schema.TestValueSchema"/>
<Class name="com.linkedin.venice.utils.TestMockTime"/>
<Class name="com.linkedin.venice.serializer.ArrayBasedPrimitiveIntegerSet"/>
<Class name="com.linkedin.davinci.ingestion.IsolatedIngestionBackend"/>
</Or>
</Match>
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions internal/venice-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {

implementation libraries.avroUtilCompatHelper
implementation libraries.bouncyCastle
implementation libraries.caffeine
implementation libraries.classgraph
implementation libraries.commonsCodec
implementation libraries.commonsIo // IntelliJ gets confused when running tests unless we explicitly depend on a recent version of commons-io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static com.linkedin.venice.kafka.validation.SegmentStatus.END_OF_FINAL_SEGMENT;
import static com.linkedin.venice.kafka.validation.SegmentStatus.NOT_STARTED;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.exceptions.validation.UnsupportedMessageTypeException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
Expand All @@ -16,6 +18,7 @@
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.utils.CollectionUtils;
import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
Expand All @@ -39,6 +42,16 @@
*/
@NotThreadsafe
public class Segment {
/**
* This cache is to reduce the size on heap of debug info, which is very repetitive in nature. Using this cache, each
* unique CharSequence should exist only once on the heap, with many segments referring to it.
*
* We use weak values so that if there are no more segments referencing a given entry, it will also be cleared from
* the cache, and thus avoid a mem leak.
*/
private static final Cache<CharSequence, CharSequence> DEDUPED_DEBUG_INFO =
Caffeine.newBuilder().weakValues().build();

// Immutable state
private final int partition;
private final int segmentNumber;
Expand Down Expand Up @@ -78,7 +91,7 @@ public Segment(
this.ended = false;
this.finalSegment = false;
this.newSegment = true;
this.debugInfo = debugInfo;
this.debugInfo = getDedupedDebugInfo(debugInfo);
this.aggregates = aggregates;
}

Expand All @@ -101,7 +114,7 @@ public Segment(int partition, ProducerPartitionState state) {
this.ended = segmentStatus.isTerminal();
this.finalSegment = segmentStatus == END_OF_FINAL_SEGMENT;
this.newSegment = false;
this.debugInfo = CollectionUtils.substituteEmptyMap(state.getDebugInfo());
this.debugInfo = getDedupedDebugInfo(state.getDebugInfo());
this.aggregates = CollectionUtils.substituteEmptyMap(state.getAggregates());
this.registered = state.isRegistered;
this.lastRecordProducerTimestamp = state.messageTimestamp;
Expand All @@ -117,6 +130,10 @@ public Segment(Segment segment) {
this.ended = segment.ended;
this.finalSegment = segment.finalSegment;
this.newSegment = false;
/**
* N.B. No need to call {@link #getDedupedDebugInfo(Map)} here since we assume the other {@link Segment} instance we
* are copying from was already deduped, having come from one of the other constructors.
*/
this.debugInfo = segment.debugInfo;
this.aggregates = segment.aggregates;
this.registered = segment.registered;
Expand Down Expand Up @@ -373,4 +390,21 @@ public SegmentStatus getStatus() {
return SegmentStatus.IN_PROGRESS;
}
}

private Map<CharSequence, CharSequence> getDedupedDebugInfo(Map<CharSequence, CharSequence> original) {
if (original == null || original.isEmpty()) {
return Collections.emptyMap();
}
/**
* The {@link Object2ObjectArrayMap} has an O(N) performance on lookups, but we don't care about the performance
* of the debug info map, so it is fine. The main concern is to make it as compact as possible, which this
* implementation achieves by minimizing per-element overhead (e.g. there is no {@link HashMap.Node} wrapping each
* entry).
*/
Map<CharSequence, CharSequence> deduped = new Object2ObjectArrayMap<>(original.size());
for (Map.Entry<CharSequence, CharSequence> entry: original.entrySet()) {
deduped.put(DEDUPED_DEBUG_INFO.get(entry.getKey(), k -> k), DEDUPED_DEBUG_INFO.get(entry.getValue(), k -> k));
}
return deduped;
}
}
Loading

0 comments on commit a88522a

Please sign in to comment.