Skip to content

Commit

Permalink
Various performance improvements in the insertRows path
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-psaha committed Jun 24, 2024
1 parent 283091f commit f21fbe5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,10 +400,11 @@ public float getSize() {
Set<String> verifyInputColumns(
Map<String, Object> row, InsertValidationResponse.InsertError error, int rowIndex) {
// Map of unquoted column name -> original column name
Map<String, String> inputColNamesMap =
row.keySet().stream()
.collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value));

Set<String> originalKeys = row.keySet();
Map<String, String> inputColNamesMap = new HashMap<>();
for (String key : originalKeys) {
inputColNamesMap.put(LiteralQuoteUtils.unquoteColumnName(key), key);
}
// Check for extra columns in the row
List<String> extraCols = new ArrayList<>();
for (String columnName : inputColNamesMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ class DataValidationUtil {
objectMapper.registerModule(module);
}

private static final BigDecimal[] POWER_10 = makePower10Table();

private static BigDecimal[] makePower10Table() {
BigDecimal[] power10 = new BigDecimal[Power10.sb16Size];
for (int i = 0; i < Power10.sb16Size; i++) {
power10[i] = new BigDecimal(Power10.sb16Table[i]);
}
return power10;
}

/**
* Validates and parses input as JSON. All types in the object tree must be valid variant types,
* see {@link DataValidationUtil#isAllowedSemiStructuredType}.
Expand Down Expand Up @@ -823,8 +833,9 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR

static void checkValueInRange(
BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) {
if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) {
throw new SFException(
// if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) {
if (bigDecimalValue.abs().compareTo(POWER_10[precision - scale]) >= 0) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Number out of representable exclusive range of (-1e%s..1e%s), rowIndex:%d",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ List<List<ChannelData<T>>> getData() {

// blob encoding version
private final Constants.BdecVersion bdecVersion;
private volatile int numProcessors = Runtime.getRuntime().availableProcessors();

/**
* Constructor for TESTING that takes (usually mocked) StreamingIngestStage
Expand Down Expand Up @@ -360,6 +361,8 @@ void distributeFlushTasks() {
List<Pair<BlobData<T>, CompletableFuture<BlobMetadata>>> blobs = new ArrayList<>();
List<ChannelData<T>> leftoverChannelsDataPerTable = new ArrayList<>();

// The API states that the number of available processors reported can change and therefore, we should poll it occasionally.
numProcessors = Runtime.getRuntime().availableProcessors();
while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) {
List<List<ChannelData<T>>> blobData = new ArrayList<>();
float totalBufferSizeInBytes = 0F;
Expand Down Expand Up @@ -705,7 +708,7 @@ String getClientPrefix() {
boolean throttleDueToQueuedFlushTasks() {
ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor) this.buildUploadWorkers;
boolean throttleOnQueuedTasks =
buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors();
buildAndUpload.getQueue().size() > numProcessors;
if (throttleOnQueuedTasks) {
logger.logWarn(
"Throttled due too many queue flush tasks (probably because of slow uploading speed),"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ public interface MemoryInfoProvider {
/** @return Max memory the JVM can allocate */
long getMaxMemory();

/** @return Total allocated JVM memory so far */
long getTotalMemory();

/** @return Free JVM memory */
long getFreeMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,38 @@

package net.snowflake.ingest.streaming.internal;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** Reads memory information from JVM runtime */
public class MemoryInfoProviderFromRuntime implements MemoryInfoProvider {
@Override
public long getMaxMemory() {
return Runtime.getRuntime().maxMemory();
private final long maxMemory;
private volatile long totalFreeMemory;
private final ScheduledExecutorService executorService;

public MemoryInfoProviderFromRuntime(long freeMemoryUpdateIntervalMs) {
maxMemory = Runtime.getRuntime().maxMemory();
totalFreeMemory = Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread th = new Thread(r, "MemoryInfoProviderFromRuntime");
th.setDaemon(true);
return th;
});
executorService.scheduleAtFixedRate(this::updateFreeMemory, 0, freeMemoryUpdateIntervalMs, TimeUnit.MILLISECONDS);
}

private void updateFreeMemory() {
totalFreeMemory = Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory());
}

@Override
public long getTotalMemory() {
return Runtime.getRuntime().totalMemory();
public long getMaxMemory() {
return maxMemory;
}

@Override
public long getFreeMemory() {
return Runtime.getRuntime().freeMemory();
return totalFreeMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn

// Reference to the row buffer
private final RowBuffer<T> rowBuffer;
private final long insertThrottleIntervalInMs;
private final int insertThrottleThresholdInBytes;
private final int insertThrottleThresholdInPercentage;
private final long maxMemoryLimitInBytes;

// Indicates whether the channel is closed
private volatile boolean isClosed;
Expand All @@ -61,6 +65,9 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
// The latest cause of channel invalidation
private String invalidationCause;

private final MemoryInfoProvider memoryInfoProvider;
private volatile long freeMemory = 0;

/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -121,6 +128,16 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
this.isClosed = false;
this.owningClient = client;
this.insertThrottleIntervalInMs =
this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
this.insertThrottleThresholdInBytes =
this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
this.insertThrottleThresholdInPercentage =
this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
this.maxMemoryLimitInBytes =
this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();

this.memoryInfoProvider = new MemoryInfoProviderFromRuntime(insertThrottleIntervalInMs);
this.channelFlushContext =
new ChannelFlushContext(
name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId);
Expand Down Expand Up @@ -373,7 +390,7 @@ public InsertValidationResponse insertRows(
Iterable<Map<String, Object>> rows,
@Nullable String startOffsetToken,
@Nullable String endOffsetToken) {
throttleInsertIfNeeded(new MemoryInfoProviderFromRuntime());
throttleInsertIfNeeded(memoryInfoProvider);
checkValidation();

if (isClosed()) {
Expand Down Expand Up @@ -448,8 +465,6 @@ public Map<String, ColumnProperties> getTableSchema() {
/** Check whether we need to throttle the insertRows API */
void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
int retry = 0;
long insertThrottleIntervalInMs =
this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
while ((hasLowRuntimeMemory(memoryInfoProvider)
|| (this.owningClient.getFlushService() != null
&& this.owningClient.getFlushService().throttleDueToQueuedFlushTasks()))
Expand All @@ -473,19 +488,11 @@ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {

/** Check whether we have a low runtime memory condition */
private boolean hasLowRuntimeMemory(MemoryInfoProvider memoryInfoProvider) {
int insertThrottleThresholdInBytes =
this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
int insertThrottleThresholdInPercentage =
this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
long maxMemoryLimitInBytes =
this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();
long maxMemory =
maxMemoryLimitInBytes == MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT
? memoryInfoProvider.getMaxMemory()
: maxMemoryLimitInBytes;
long freeMemory =
memoryInfoProvider.getFreeMemory()
+ (memoryInfoProvider.getMaxMemory() - memoryInfoProvider.getTotalMemory());
freeMemory = memoryInfoProvider.getFreeMemory();
boolean hasLowRuntimeMemory =
freeMemory < insertThrottleThresholdInBytes
&& freeMemory * 100 / maxMemory < insertThrottleThresholdInPercentage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ public long getMaxMemory() {
return maxMemory;
}

@Override
public long getTotalMemory() {
return maxMemory;
}

@Override
public long getFreeMemory() {
return freeMemory;
Expand Down

0 comments on commit f21fbe5

Please sign in to comment.