diff --git a/pom.xml b/pom.xml
index 4e6907047..4c6665eec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -358,6 +358,18 @@
3.7.7
test
+
+ org.openjdk.jmh
+ jmh-core
+ 1.34
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ 1.34
+ test
+
@@ -470,6 +482,13 @@
org.apache.parquet
parquet-common
+
+
+
+ javax.annotation
+ javax.annotation-api
+
+
org.apache.parquet
@@ -527,6 +546,16 @@
mockito-core
test
+
+ org.openjdk.jmh
+ jmh-core
+ test
+
+
+ org.openjdk.jmh
+ jmh-generator-annprocess
+ test
+
org.powermock
powermock-api-mockito2
@@ -723,8 +752,8 @@
true
+ to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
+ the dependency is unused, so we ignore it here-->
org.apache.commons:commons-compress
org.apache.commons:commons-configuration2
@@ -818,10 +847,8 @@
2.0.1
failFast
-
+
Apache License 2.0
BSD 2-Clause License
@@ -1133,10 +1160,8 @@
-
+
org.codehaus.mojo
exec-maven-plugin
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
index 6d5dce17f..71a9d501e 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
@@ -16,7 +16,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
@@ -400,10 +399,10 @@ public float getSize() {
Set verifyInputColumns(
Map row, InsertValidationResponse.InsertError error, int rowIndex) {
// Map of unquoted column name -> original column name
- Map inputColNamesMap =
- row.keySet().stream()
- .collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));
-
+ Set originalKeys = row.keySet();
+ Map inputColNamesMap = new HashMap<>();
+ originalKeys.forEach(
+ key -> inputColNamesMap.put(LiteralQuoteUtils.unquoteColumnName(key), key));
// Check for extra columns in the row
List extraCols = new ArrayList<>();
for (String columnName : inputColNamesMap.keySet()) {
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
index 814423c28..162e56145 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
@@ -86,6 +86,18 @@ class DataValidationUtil {
objectMapper.registerModule(module);
}
+ // Caching the powers of 10 that are used for checking the range of numbers because computing them
+ // on-demand is expensive.
+ private static final BigDecimal[] POWER_10 = makePower10Table();
+
+ private static BigDecimal[] makePower10Table() {
+ BigDecimal[] power10 = new BigDecimal[Power10.sb16Size];
+ for (int i = 0; i < Power10.sb16Size; i++) {
+ power10[i] = new BigDecimal(Power10.sb16Table[i]);
+ }
+ return power10;
+ }
+
/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
@@ -823,7 +835,11 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
- if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) {
+ BigDecimal comparand =
+ (precision >= scale) && (precision - scale) < POWER_10.length
+ ? POWER_10[precision - scale]
+ : BigDecimal.TEN.pow(precision - scale);
+ if (bigDecimalValue.abs().compareTo(comparand) >= 0) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
index f08196477..76e43ff4d 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java
@@ -122,6 +122,7 @@ List>> getData() {
// blob encoding version
private final Constants.BdecVersion bdecVersion;
+ private volatile int numProcessors = Runtime.getRuntime().availableProcessors();
/**
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage
@@ -360,6 +361,9 @@ void distributeFlushTasks() {
List, CompletableFuture>> blobs = new ArrayList<>();
List> leftoverChannelsDataPerTable = new ArrayList<>();
+ // The API states that the number of available processors reported can change and therefore, we
+ // should poll it occasionally.
+ numProcessors = Runtime.getRuntime().availableProcessors();
while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
List>> blobData = new ArrayList<>();
float totalBufferSizeInBytes = 0F;
@@ -704,8 +708,7 @@ String getClientPrefix() {
*/
boolean throttleDueToQueuedFlushTasks() {
ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor) this.buildUploadWorkers;
- boolean throttleOnQueuedTasks =
- buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors();
+ boolean throttleOnQueuedTasks = buildAndUpload.getQueue().size() > numProcessors;
if (throttleOnQueuedTasks) {
logger.logWarn(
"Throttled due too many queue flush tasks (probably because of slow uploading speed),"
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
index f426e898d..777ae4fdc 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java
@@ -9,9 +9,6 @@ public interface MemoryInfoProvider {
/** @return Max memory the JVM can allocate */
long getMaxMemory();
- /** @return Total allocated JVM memory so far */
- long getTotalMemory();
-
/** @return Free JVM memory */
long getFreeMemory();
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
index 3a957f225..d248ddfd9 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java
@@ -4,20 +4,51 @@
package net.snowflake.ingest.streaming.internal;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/** Reads memory information from JVM runtime */
public class MemoryInfoProviderFromRuntime implements MemoryInfoProvider {
- @Override
- public long getMaxMemory() {
- return Runtime.getRuntime().maxMemory();
+ private final long maxMemory;
+ private volatile long totalFreeMemory;
+ private final ScheduledExecutorService executorService;
+ private static final long FREE_MEMORY_UPDATE_INTERVAL_MS = 100;
+ private static final MemoryInfoProviderFromRuntime INSTANCE =
+ new MemoryInfoProviderFromRuntime(FREE_MEMORY_UPDATE_INTERVAL_MS);
+
+ private MemoryInfoProviderFromRuntime(long freeMemoryUpdateIntervalMs) {
+ maxMemory = Runtime.getRuntime().maxMemory();
+ totalFreeMemory =
+ Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
+ executorService =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread th = new Thread(r, "MemoryInfoProviderFromRuntime");
+ th.setDaemon(true);
+ return th;
+ });
+ executorService.scheduleAtFixedRate(
+ this::updateFreeMemory, 0, freeMemoryUpdateIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void updateFreeMemory() {
+ totalFreeMemory =
+ Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
+ }
+
+ public static MemoryInfoProviderFromRuntime getInstance() {
+ return INSTANCE;
}
@Override
- public long getTotalMemory() {
- return Runtime.getRuntime().totalMemory();
+ public long getMaxMemory() {
+ return maxMemory;
}
@Override
public long getFreeMemory() {
- return Runtime.getRuntime().freeMemory();
+ return totalFreeMemory;
}
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
index 39ec66dbb..3ad3db5f4 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java
@@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
@@ -124,6 +125,12 @@ private SerializationResult serializeFromParquetWriteBuffers(
if (mergedChannelWriter != null) {
mergedChannelWriter.close();
+ this.verifyRowCounts(
+ "serializeFromParquetWriteBuffers",
+ mergedChannelWriter,
+ rowCount,
+ channelsDataPerTable,
+ -1);
}
return new SerializationResult(
channelsMetadataList,
@@ -216,6 +223,9 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();
+ this.verifyRowCounts(
+ "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());
+
return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
@@ -224,4 +234,71 @@ private SerializationResult serializeFromJavaObjects(
mergedData,
chunkMinMaxInsertTimeInMs);
}
+
+ /**
+ * Validates that rows count in metadata matches the row count in Parquet footer and the row count
+ * written by the parquet writer
+ *
+ * @param serializationType Serialization type, used for logging purposes only
+ * @param writer Parquet writer writing the data
+ * @param channelsDataPerTable Channel data
+ * @param totalMetadataRowCount Row count calculated during metadata collection
+ * @param javaSerializationTotalRowCount Total row count when java object serialization is used.
+ * Used only for logging purposes if there is a mismatch.
+ */
+ private void verifyRowCounts(
+ String serializationType,
+ BdecParquetWriter writer,
+ long totalMetadataRowCount,
+ List> channelsDataPerTable,
+ long javaSerializationTotalRowCount) {
+ long parquetTotalRowsWritten = writer.getRowsWritten();
+
+ List parquetFooterRowsPerBlock = writer.getRowCountsFromFooter();
+ long parquetTotalRowsInFooter = 0;
+ for (long perBlockCount : parquetFooterRowsPerBlock) {
+ parquetTotalRowsInFooter += perBlockCount;
+ }
+
+ if (parquetTotalRowsInFooter != totalMetadataRowCount
+ || parquetTotalRowsWritten != totalMetadataRowCount) {
+
+ final String perChannelRowCountsInMetadata =
+ channelsDataPerTable.stream()
+ .map(x -> String.valueOf(x.getRowCount()))
+ .collect(Collectors.joining(","));
+
+ final String channelNames =
+ channelsDataPerTable.stream()
+ .map(x -> String.valueOf(x.getChannelContext().getName()))
+ .collect(Collectors.joining(","));
+
+ final String perBlockRowCountsInFooter =
+ parquetFooterRowsPerBlock.stream().map(String::valueOf).collect(Collectors.joining(","));
+
+ final long channelsCountInMetadata = channelsDataPerTable.size();
+
+ throw new SFException(
+ ErrorCode.INTERNAL_ERROR,
+ String.format(
+ "[%s]The number of rows in Parquet does not match the number of rows in metadata. "
+ + "parquetTotalRowsInFooter=%d "
+ + "totalMetadataRowCount=%d "
+ + "parquetTotalRowsWritten=%d "
+ + "perChannelRowCountsInMetadata=%s "
+ + "perBlockRowCountsInFooter=%s "
+ + "channelsCountInMetadata=%d "
+ + "countOfSerializedJavaObjects=%d "
+ + "channelNames=%s",
+ serializationType,
+ parquetTotalRowsInFooter,
+ totalMetadataRowCount,
+ parquetTotalRowsWritten,
+ perChannelRowCountsInMetadata,
+ perBlockRowCountsInFooter,
+ channelsCountInMetadata,
+ javaSerializationTotalRowCount,
+ channelNames));
+ }
+ }
}
diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
index 58e81d116..8ebc23ca1 100644
--- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
+++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java
@@ -45,6 +45,10 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
// Reference to the row buffer
private final RowBuffer rowBuffer;
+ private final long insertThrottleIntervalInMs;
+ private final int insertThrottleThresholdInBytes;
+ private final int insertThrottleThresholdInPercentage;
+ private final long maxMemoryLimitInBytes;
// Indicates whether the channel is closed
private volatile boolean isClosed;
@@ -61,6 +65,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
// The latest cause of channel invalidation
private String invalidationCause;
+ private final MemoryInfoProvider memoryInfoProvider;
+ private volatile long freeMemoryInBytes = 0;
+
/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
@@ -121,6 +128,17 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn
OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
this.isClosed = false;
this.owningClient = client;
+
+ this.insertThrottleIntervalInMs =
+ this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
+ this.insertThrottleThresholdInBytes =
+ this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
+ this.insertThrottleThresholdInPercentage =
+ this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
+ this.maxMemoryLimitInBytes =
+ this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();
+
+ this.memoryInfoProvider = MemoryInfoProviderFromRuntime.getInstance();
this.channelFlushContext =
new ChannelFlushContext(
name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
@@ -373,7 +391,7 @@ public InsertValidationResponse insertRows(
Iterable