diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
index b058132703..efe7ad69bf 100644
--- a/.git-blame-ignore-revs
+++ b/.git-blame-ignore-revs
@@ -1,3 +1,5 @@
43b8eb5ca60b708c6bc86a6a0c92df2d657af2c1
690830eb1aae03017aa43627128b4860e60eeab1
bb6544918fc3105a834782eb3c170711d5ba206c
+63a7921da1c1814f9d856a6375f84ba392c01abb
+9bd0175869cd66376457bdecf9e00583cfb98301
diff --git a/build.gradle b/build.gradle
index f93565cda1..7b2d649346 100644
--- a/build.gradle
+++ b/build.gradle
@@ -149,14 +149,6 @@ def parser = new XmlSlurper()
parser.setFeature("http://apache.org/xml/features/disallow-doctype-decl", false)
parser.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
-// We remove square brackets from test names, which occur when using a DataProvider,
-// because occasionally the number in the brackets is non-deterministic (unknown why)
-// and when that occurs, the test-retry plugin gets confused and cannot match the
-// attempts together.
-def removeSquareBrackets(String testName) {
- return testName.replaceFirst('\\[[0-9]+\\]', '')
-}
-
configurations {
alpnAgent {
}
@@ -431,16 +423,14 @@ subprojects {
}
beforeTest { descriptor ->
- def testName = removeSquareBrackets(descriptor.displayName)
def out = services.get(StyledTextOutputFactory).create("an-ouput")
- out.style(Style.Normal).println("$descriptor.className > $testName STARTED")
+ out.style(Style.Normal).println("$descriptor.className > $descriptor.displayName STARTED")
}
afterTest { descriptor, result ->
def totalTime = result.endTime - result.startTime
def prettyTime = totalTime < 1000 ? "$totalTime ms" : "${totalTime / 1000} s"
- def testName = removeSquareBrackets(descriptor.displayName)
def out = services.get(StyledTextOutputFactory).create("an-ouput")
def style = result.resultType == TestResult.ResultType.SUCCESS
@@ -455,7 +445,7 @@ subprojects {
? 'FAILED '
: 'SKIPPED '
- out.style(Style.Normal).text("$descriptor.className > $testName ")
+ out.style(Style.Normal).text("$descriptor.className > $descriptor.displayName ")
.style(style).text(status)
.style(Style.Normal).println("($prettyTime)")
diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
index c2523cc4b4..6d437e77b1 100644
--- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
+++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
@@ -163,10 +163,10 @@ private void updateOffsetRecord(GUID guid, Segment segment, OffsetRecord offsetR
* that). It is redundant that we store the same debug values once per partition. In the future,
* if we want to eliminate this redundancy, we could move the per-producer debug info to another
* data structure, though that would increase bookkeeping complexity. This is expected to be a
- * minor overhead, and therefore it appears to be a premature to optimize this now.
+ * minor overhead, and therefore it appears to be premature to optimize this now.
*/
- state.aggregates = segment.getAggregates();
- state.debugInfo = segment.getDebugInfo();
+ state.aggregates = CollectionUtils.substituteEmptyMap(segment.getAggregates());
+ state.debugInfo = CollectionUtils.substituteEmptyMap(segment.getDebugInfo());
}
state.checksumType = segment.getCheckSumType().getValue();
/**
diff --git a/gradle/spotbugs/exclude.xml b/gradle/spotbugs/exclude.xml
index 0c4a6d06cd..58c45f7ee8 100644
--- a/gradle/spotbugs/exclude.xml
+++ b/gradle/spotbugs/exclude.xml
@@ -251,7 +251,6 @@
-
diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/ArrayBasedPrimitiveIntegerSet.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/ArrayBasedPrimitiveIntegerSet.java
deleted file mode 100644
index 6095750738..0000000000
--- a/internal/venice-client-common/src/main/java/com/linkedin/venice/serializer/ArrayBasedPrimitiveIntegerSet.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package com.linkedin.venice.serializer;
-
-import java.util.AbstractSet;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-
-/**
- * This class is not thread-safe; it's optimized for GC by maintaining a primitive int array without boxing,
- * but it's slow in add(), contains(), remove(), etc.
- */
-public class ArrayBasedPrimitiveIntegerSet extends AbstractSet implements Set {
- private static final int[] EMPTY = new int[0];
- private int size;
- private int[] elements = EMPTY;
-
- public ArrayBasedPrimitiveIntegerSet() {
- super();
- }
-
- public ArrayBasedPrimitiveIntegerSet(int initialCapacity) {
- elements = new int[initialCapacity];
- size = 0;
- }
-
- public ArrayBasedPrimitiveIntegerSet(int[] backingArray, int initialSize) {
- if (initialSize > backingArray.length) {
- throw new IllegalArgumentException(
- "initialSize (" + initialSize + ") can not be bigger than the actual size of" + " the backingArray ("
- + backingArray.length + ")");
- }
- elements = backingArray;
- size = initialSize;
- }
-
- public int size() {
- return size;
- }
-
- public boolean isEmpty() {
- return size == 0;
- }
-
- public boolean contains(Object o) {
- if (!(o instanceof Integer)) {
- throw new ClassCastException("Input object is not an integer");
- }
- Integer other = (Integer) o;
- return contains((int) other);
- }
-
- /**
- * To explicitly use this GC optimized API, declare variable type as ArrayBasedPrimitiveIntegerSet instead of Set.
- */
- public boolean contains(int other) {
- int s = size();
- for (int i = 0; i < s; i++) {
- if (elements[i] == other) {
- return true;
- }
- }
- return false;
- }
-
- public Iterator iterator() {
- return new PrimitiveIntegerSetIterator(this);
- }
-
- // Modification Operations
- public boolean add(Integer e) {
- if (e == null) {
- throw new NullPointerException();
- }
- return add((int) e);
- }
-
- /**
- * To explicitly use this GC optimized API, declare variable type as ArrayBasedPrimitiveIntegerSet instead of Set.
- */
- public boolean add(int e) {
- int s = size();
- for (int i = 0; i < s; i++) {
- if (elements[i] == e) {
- return false;
- }
- }
- // new element
- if (size == elements.length) {
- int[] newElements = new int[(size * 3) / 2 + 1];
- System.arraycopy(elements, 0, newElements, 0, size);
- elements = newElements;
- }
- elements[size++] = e;
- return true;
- }
-
- public boolean remove(Object o) {
- if (o == null) {
- throw new NullPointerException();
- }
- if (!(o instanceof Integer)) {
- throw new ClassCastException("Input object is not an integer");
- }
- Integer e = (Integer) o;
- return remove((int) e);
- }
-
- /**
- * To explicitly use this GC optimized API, declare variable type as ArrayBasedPrimitiveIntegerSet instead of Set.
- */
- public boolean remove(int e) {
- int s = size();
- if (s == 0) {
- return false;
- }
- for (int i = 0; i < s; i++) {
- if (elements[i] == e) {
- // O(1) swap
- elements[i] = elements[s - 1];
- this.size -= 1;
- return true;
- }
- }
- return false;
- }
-
- /**
- * Removes all of the elements from this set (optional operation).
- * The set will be empty after this call returns.
- *
- * @throws UnsupportedOperationException if the clear method
- * is not supported by this set
- */
- public void clear() {
- // keep the backing array
- this.size = 0;
- }
-
- // Comparison and hashing
-
- @Override
- public int hashCode() {
- int hashCode = 1;
- for (int i = 0; i < this.size; i++) {
- hashCode = 31 * hashCode + Integer.hashCode(elements[i]);
- }
- return hashCode;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof ArrayBasedPrimitiveIntegerSet)) {
- return false;
- }
- ArrayBasedPrimitiveIntegerSet other = (ArrayBasedPrimitiveIntegerSet) obj;
- return Arrays.equals(other.elements, this.elements) && other.size == this.size;
- }
-
- private int[] getBackingArray() {
- return this.elements;
- }
-
- public static class PrimitiveIntegerSetIterator implements Iterator {
- final private ArrayBasedPrimitiveIntegerSet set;
- final private int[] backingArray;
- final private int size;
- private int index;
- private int lastReturnedElement;
-
- public PrimitiveIntegerSetIterator(ArrayBasedPrimitiveIntegerSet set) {
- this.set = set;
- this.backingArray = set.getBackingArray();
- this.size = set.size();
- this.index = 0;
- }
-
- public boolean hasNext() {
- return index < size;
- }
-
- public Integer next() {
- return nextPrimitiveInteger();
- }
-
- public int nextPrimitiveInteger() {
- if (index >= size) {
- throw new NoSuchElementException("No more elements in this iterator");
- }
- lastReturnedElement = backingArray[index++];
- return lastReturnedElement;
- }
-
- public void remove() {
- if (index == 0) {
- throw new IllegalStateException("next method has not been called");
- }
- set.remove(lastReturnedElement);
- }
- }
-}
diff --git a/internal/venice-common/build.gradle b/internal/venice-common/build.gradle
index 07930d899d..43c979cc51 100644
--- a/internal/venice-common/build.gradle
+++ b/internal/venice-common/build.gradle
@@ -27,6 +27,7 @@ dependencies {
implementation libraries.avroUtilCompatHelper
implementation libraries.bouncyCastle
+ implementation libraries.caffeine
implementation libraries.classgraph
implementation libraries.commonsCodec
implementation libraries.commonsIo // IntelliJ gets confused when running tests unless we explicitly depend on a recent version of commons-io
diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
index b9384a7053..8e33f87a04 100644
--- a/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
+++ b/internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
@@ -3,6 +3,8 @@
import static com.linkedin.venice.kafka.validation.SegmentStatus.END_OF_FINAL_SEGMENT;
import static com.linkedin.venice.kafka.validation.SegmentStatus.NOT_STARTED;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.exceptions.validation.UnsupportedMessageTypeException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
@@ -16,6 +18,7 @@
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.utils.CollectionUtils;
+import it.unimi.dsi.fastutil.objects.Object2ObjectArrayMap;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
@@ -39,6 +42,16 @@
*/
@NotThreadsafe
public class Segment {
+ /**
+ * This cache is to reduce the size on heap of debug info, which is very repetitive in nature. Using this cache, each
+ * unique CharSequence should exist only once on the heap, with many segments referring to it.
+ *
+ * We use weak values so that if there are no more segments referencing a given entry, it will also be cleared from
+ * the cache, and thus avoid a mem leak.
+ */
+ private static final Cache DEDUPED_DEBUG_INFO =
+ Caffeine.newBuilder().weakValues().build();
+
// Immutable state
private final int partition;
private final int segmentNumber;
@@ -78,7 +91,7 @@ public Segment(
this.ended = false;
this.finalSegment = false;
this.newSegment = true;
- this.debugInfo = debugInfo;
+ this.debugInfo = getDedupedDebugInfo(debugInfo);
this.aggregates = aggregates;
}
@@ -101,7 +114,7 @@ public Segment(int partition, ProducerPartitionState state) {
this.ended = segmentStatus.isTerminal();
this.finalSegment = segmentStatus == END_OF_FINAL_SEGMENT;
this.newSegment = false;
- this.debugInfo = CollectionUtils.substituteEmptyMap(state.getDebugInfo());
+ this.debugInfo = getDedupedDebugInfo(state.getDebugInfo());
this.aggregates = CollectionUtils.substituteEmptyMap(state.getAggregates());
this.registered = state.isRegistered;
this.lastRecordProducerTimestamp = state.messageTimestamp;
@@ -117,6 +130,10 @@ public Segment(Segment segment) {
this.ended = segment.ended;
this.finalSegment = segment.finalSegment;
this.newSegment = false;
+ /**
+ * N.B. No need to call {@link #getDedupedDebugInfo(Map)} here since we assume the other {@link Segment} instance we
+ * are copying from was already deduped, having come from one of the other constructors.
+ */
this.debugInfo = segment.debugInfo;
this.aggregates = segment.aggregates;
this.registered = segment.registered;
@@ -373,4 +390,21 @@ public SegmentStatus getStatus() {
return SegmentStatus.IN_PROGRESS;
}
}
+
+ private Map getDedupedDebugInfo(Map original) {
+ if (original == null || original.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ /**
+ * The {@link Object2ObjectArrayMap} has an O(N) performance on lookups, but we don't care about the performance
+ * of the debug info map, so it is fine. The main concern is to make it as compact as possible, which this
+ * implementation achieves by minimizing per-element overhead (e.g. there is no {@link HashMap.Node} wrapping each
+ * entry).
+ */
+ Map deduped = new Object2ObjectArrayMap<>(original.size());
+ for (Map.Entry entry: original.entrySet()) {
+ deduped.put(DEDUPED_DEBUG_INFO.get(entry.getKey(), k -> k), DEDUPED_DEBUG_INFO.get(entry.getValue(), k -> k));
+ }
+ return deduped;
+ }
}
diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/validation/SegmentTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/validation/SegmentTest.java
index 02186a0d73..9f7e6a76f7 100644
--- a/internal/venice-common/src/test/java/com/linkedin/venice/kafka/validation/SegmentTest.java
+++ b/internal/venice-common/src/test/java/com/linkedin/venice/kafka/validation/SegmentTest.java
@@ -1,6 +1,7 @@
package com.linkedin.venice.kafka.validation;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
@@ -15,7 +16,14 @@
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
+import com.linkedin.venice.utils.Utils;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.testng.annotations.Test;
@@ -126,4 +134,46 @@ public void test() {
assertEquals(segmentWithMD5Checksum1.getFinalCheckSum(), segmentWithMD5Checksum2.getFinalCheckSum());
}
+
+ @Test
+ public void testDebugInfoDeduping() {
+ Map debugInfo1 = Utils.getDebugInfo();
+ Map debugInfo2 = Utils.getDebugInfo();
+ assertEquals(
+ debugInfo1,
+ debugInfo2,
+ "We should get equal debug info when calling Utils.getDebugInfo() multiple times.");
+
+ Segment segment1 = new Segment(0, 0, 0, CheckSumType.MD5, debugInfo1, Collections.emptyMap());
+ Segment segment2 = new Segment(1, 0, 0, CheckSumType.MD5, debugInfo2, Collections.emptyMap());
+ assertEquals(
+ segment1.getDebugInfo(),
+ segment2.getDebugInfo(),
+ "The debug info of the two segments should still be equal.");
+
+ List allProperties = Arrays.asList("host", "JDK major version", "path", "pid", "user", "version");
+ Set nonSingletonProperties = new HashSet<>(Arrays.asList("JDK major version", "path", "pid"));
+ for (String property: allProperties) {
+ CharSequence rawValue1 = debugInfo1.get(property);
+ CharSequence rawValue2 = debugInfo2.get(property);
+ if (nonSingletonProperties.contains(property)) {
+ assertFalse(
+ rawValue1 == rawValue2,
+ "The identity of the elements inside the debug info map are not expected to be the same; property: "
+ + property + ", rawValue1: " + rawValue1 + ", rawValue2: " + rawValue2);
+ }
+ assertEquals(
+ rawValue1,
+ rawValue2,
+ "The content of the elements inside the debug info map are expected to be equal; property: " + property
+ + ", rawValue1: " + rawValue1 + ", rawValue2: " + rawValue2);
+
+ CharSequence dedupedValue1 = segment1.getDebugInfo().get(property);
+ CharSequence dedupedValue2 = segment2.getDebugInfo().get(property);
+ assertTrue(
+ dedupedValue1 == dedupedValue2,
+ "The identity of the elements inside the debug info maps should be deduped; property: " + property
+ + ", dedupedValue1: " + dedupedValue1 + ", dedupedValue2: " + dedupedValue2);
+ }
+ }
}