Skip to content

Commit

Permalink
Merge branch 'master' into tzhang-si-refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-xhuang authored Jul 10, 2024
2 parents 8bddd2c + f5669d1 commit e6c4413
Show file tree
Hide file tree
Showing 15 changed files with 511 additions and 58 deletions.
45 changes: 35 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,18 @@
<version>3.7.7</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.34</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.34</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -470,6 +482,13 @@
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<exclusions>
<!-- Dependencies are excluded because CDDL + GPLv2 with classpath exception license is not approved -->
<exclusion>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -527,6 +546,16 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
Expand Down Expand Up @@ -723,8 +752,8 @@
<ignoreNonCompile>true</ignoreNonCompile>
<ignoredDependencies>
<!-- We defined these as direct dependencies (as opposed to just declaring it in dependencyManagement)
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
to workaround https://issues.apache.org/jira/browse/MNG-7982. Now the dependency analyzer complains that
the dependency is unused, so we ignore it here-->
<ignoredDependency>org.apache.commons:commons-compress</ignoredDependency>
<ignoredDependency>org.apache.commons:commons-configuration2</ignoredDependency>
</ignoredDependencies>
Expand Down Expand Up @@ -818,10 +847,8 @@
<version>2.0.1</version>
<configuration>
<errorRemedy>failFast</errorRemedy>
<!--
The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.
-->
<!--The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please
check your dependencies", verify the conditions of the license and add the reference to it here.-->
<includedLicenses>
<includedLicense>Apache License 2.0</includedLicense>
<includedLicense>BSD 2-Clause License</includedLicense>
Expand Down Expand Up @@ -1133,10 +1160,8 @@
</execution>
</executions>
</plugin>
<!--
Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR.
-->
<!-- Plugin executes license processing Python script, which copies third party license files into the directory
target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR. -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
Expand Down Expand Up @@ -400,10 +399,10 @@ public float getSize() {
Set<String> verifyInputColumns(
Map<String, Object> row, InsertValidationResponse.InsertError error, int rowIndex) {
// Map of unquoted column name -> original column name
Map<String, String> inputColNamesMap =
row.keySet().stream()
.collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));

Set<String> originalKeys = row.keySet();
Map<String, String> inputColNamesMap = new HashMap<>();
originalKeys.forEach(
key -> inputColNamesMap.put(LiteralQuoteUtils.unquoteColumnName(key), key));
// Check for extra columns in the row
List<String> extraCols = new ArrayList<>();
for (String columnName : inputColNamesMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ class DataValidationUtil {
objectMapper.registerModule(module);
}

// Caching the powers of 10 that are used for checking the range of numbers because computing them
// on-demand is expensive.
private static final BigDecimal[] POWER_10 = makePower10Table();

private static BigDecimal[] makePower10Table() {
BigDecimal[] power10 = new BigDecimal[Power10.sb16Size];
for (int i = 0; i < Power10.sb16Size; i++) {
power10[i] = new BigDecimal(Power10.sb16Table[i]);
}
return power10;
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
Expand Down Expand Up @@ -823,7 +835,11 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) {
BigDecimal comparand =
(precision >= scale) && (precision - scale) < POWER_10.length
? POWER_10[precision - scale]
: BigDecimal.TEN.pow(precision - scale);
if (bigDecimalValue.abs().compareTo(comparand) >= 0) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ List<List<ChannelData<T>>> getData() {

// blob encoding version
private final Constants.BdecVersion bdecVersion;
private volatile int numProcessors = Runtime.getRuntime().availableProcessors();

/**
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage
Expand Down Expand Up @@ -360,6 +361,9 @@ void distributeFlushTasks() {
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

// The API states that the number of available processors reported can change and therefore, we
// should poll it occasionally.
numProcessors = Runtime.getRuntime().availableProcessors();
while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
List<List<ChannelData<T>>> blobData = new ArrayList<>();
float totalBufferSizeInBytes = 0F;
Expand Down Expand Up @@ -704,8 +708,7 @@ String getClientPrefix() {
*/
boolean throttleDueToQueuedFlushTasks() {
ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor) this.buildUploadWorkers;
boolean throttleOnQueuedTasks =
buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors();
boolean throttleOnQueuedTasks = buildAndUpload.getQueue().size() > numProcessors;
if (throttleOnQueuedTasks) {
logger.logWarn(
"Throttled due too many queue flush tasks (probably because of slow uploading speed),"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ public interface MemoryInfoProvider {
/** @return Max memory the JVM can allocate */
long getMaxMemory();

/** @return Total allocated JVM memory so far */
long getTotalMemory();

/** @return Free JVM memory */
long getFreeMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,51 @@

package net.snowflake.ingest.streaming.internal;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** Reads memory information from JVM runtime */
public class MemoryInfoProviderFromRuntime implements MemoryInfoProvider {
@Override
public long getMaxMemory() {
return Runtime.getRuntime().maxMemory();
private final long maxMemory;
private volatile long totalFreeMemory;
private final ScheduledExecutorService executorService;
private static final long FREE_MEMORY_UPDATE_INTERVAL_MS = 100;
private static final MemoryInfoProviderFromRuntime INSTANCE =
new MemoryInfoProviderFromRuntime(FREE_MEMORY_UPDATE_INTERVAL_MS);

private MemoryInfoProviderFromRuntime(long freeMemoryUpdateIntervalMs) {
maxMemory = Runtime.getRuntime().maxMemory();
totalFreeMemory =
Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
executorService =
new ScheduledThreadPoolExecutor(
1,
r -> {
Thread th = new Thread(r, "MemoryInfoProviderFromRuntime");
th.setDaemon(true);
return th;
});
executorService.scheduleAtFixedRate(
this::updateFreeMemory, 0, freeMemoryUpdateIntervalMs, TimeUnit.MILLISECONDS);
}

private void updateFreeMemory() {
totalFreeMemory =
Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
}

public static MemoryInfoProviderFromRuntime getInstance() {
return INSTANCE;
}

@Override
public long getTotalMemory() {
return Runtime.getRuntime().totalMemory();
public long getMaxMemory() {
return maxMemory;
}

@Override
public long getFreeMemory() {
return Runtime.getRuntime().freeMemory();
return totalFreeMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
Expand Down Expand Up @@ -124,6 +125,12 @@ private SerializationResult serializeFromParquetWriteBuffers(

if (mergedChannelWriter != null) {
mergedChannelWriter.close();
this.verifyRowCounts(
"serializeFromParquetWriteBuffers",
mergedChannelWriter,
rowCount,
channelsDataPerTable,
-1);
}
return new SerializationResult(
channelsMetadataList,
Expand Down Expand Up @@ -216,6 +223,9 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

this.verifyRowCounts(
"serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());

return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
Expand All @@ -224,4 +234,71 @@ private SerializationResult serializeFromJavaObjects(
mergedData,
chunkMinMaxInsertTimeInMs);
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
*
* @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param totalMetadataRowCount Row count calculated during metadata collection
* @param javaSerializationTotalRowCount Total row count when java object serialization is used.
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
String serializationType,
BdecParquetWriter writer,
long totalMetadataRowCount,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
long javaSerializationTotalRowCount) {
long parquetTotalRowsWritten = writer.getRowsWritten();

List<Long> parquetFooterRowsPerBlock = writer.getRowCountsFromFooter();
long parquetTotalRowsInFooter = 0;
for (long perBlockCount : parquetFooterRowsPerBlock) {
parquetTotalRowsInFooter += perBlockCount;
}

if (parquetTotalRowsInFooter != totalMetadataRowCount
|| parquetTotalRowsWritten != totalMetadataRowCount) {

final String perChannelRowCountsInMetadata =
channelsDataPerTable.stream()
.map(x -> String.valueOf(x.getRowCount()))
.collect(Collectors.joining(","));

final String channelNames =
channelsDataPerTable.stream()
.map(x -> String.valueOf(x.getChannelContext().getName()))
.collect(Collectors.joining(","));

final String perBlockRowCountsInFooter =
parquetFooterRowsPerBlock.stream().map(String::valueOf).collect(Collectors.joining(","));

final long channelsCountInMetadata = channelsDataPerTable.size();

throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"[%s]The number of rows in Parquet does not match the number of rows in metadata. "
+ "parquetTotalRowsInFooter=%d "
+ "totalMetadataRowCount=%d "
+ "parquetTotalRowsWritten=%d "
+ "perChannelRowCountsInMetadata=%s "
+ "perBlockRowCountsInFooter=%s "
+ "channelsCountInMetadata=%d "
+ "countOfSerializedJavaObjects=%d "
+ "channelNames=%s",
serializationType,
parquetTotalRowsInFooter,
totalMetadataRowCount,
parquetTotalRowsWritten,
perChannelRowCountsInMetadata,
perBlockRowCountsInFooter,
channelsCountInMetadata,
javaSerializationTotalRowCount,
channelNames));
}
}
}
Loading

0 comments on commit e6c4413

Please sign in to comment.