diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 6d5dce17f..7e04253fd 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -400,10 +400,11 @@ public float getSize() { Set verifyInputColumns( Map row, InsertValidationResponse.InsertError error, int rowIndex) { // Map of unquoted column name -> original column name - Map inputColNamesMap = - row.keySet().stream() - .collect(Collectors.toMap(LiteralQuoteUtils::unquoteColumnName, value -> value)); - + Set originalKeys = row.keySet(); + Map inputColNamesMap = new HashMap<>(); + for (String key : originalKeys) { + inputColNamesMap.put(LiteralQuoteUtils.unquoteColumnName(key), key); + } // Check for extra columns in the row List extraCols = new ArrayList<>(); for (String columnName : inputColNamesMap.keySet()) { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 814423c28..058f19bd7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -86,6 +86,16 @@ class DataValidationUtil { objectMapper.registerModule(module); } + private static final BigDecimal[] POWER_10 = makePower10Table(); + + private static BigDecimal[] makePower10Table() { + BigDecimal[] power10 = new BigDecimal[Power10.sb16Size]; + for (int i = 0; i < Power10.sb16Size; i++) { + power10[i] = new BigDecimal(Power10.sb16Table[i]); + } + return power10; + } + /** * Validates and parses input as JSON. All types in the object tree must be valid variant types, * see {@link DataValidationUtil#isAllowedSemiStructuredType}. @@ -823,8 +833,9 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR static void checkValueInRange( BigDecimal bigDecimalValue, int scale, int precision, final long insertRowIndex) { - if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) { - throw new SFException( +// if (bigDecimalValue.abs().compareTo(BigDecimal.TEN.pow(precision - scale)) >= 0) { + if (bigDecimalValue.abs().compareTo(POWER_10[precision - scale]) >= 0) { + throw new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( "Number out of representable exclusive range of (-1e%s..1e%s), rowIndex:%d", diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index f08196477..340f814ad 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -122,6 +122,7 @@ List>> getData() { // blob encoding version private final Constants.BdecVersion bdecVersion; + private volatile int numProcessors = Runtime.getRuntime().availableProcessors(); /** * Constructor for TESTING that takes (usually mocked) StreamingIngestStage @@ -360,6 +361,8 @@ void distributeFlushTasks() { List, CompletableFuture>> blobs = new ArrayList<>(); List> leftoverChannelsDataPerTable = new ArrayList<>(); + // The API states that the number of available processors reported can change and therefore, we should poll it occasionally. + numProcessors = Runtime.getRuntime().availableProcessors(); while (itr.hasNext() || !leftoverChannelsDataPerTable.isEmpty()) { List>> blobData = new ArrayList<>(); float totalBufferSizeInBytes = 0F; @@ -705,7 +708,7 @@ String getClientPrefix() { boolean throttleDueToQueuedFlushTasks() { ThreadPoolExecutor buildAndUpload = (ThreadPoolExecutor) this.buildUploadWorkers; boolean throttleOnQueuedTasks = - buildAndUpload.getQueue().size() > Runtime.getRuntime().availableProcessors(); + buildAndUpload.getQueue().size() > numProcessors; if (throttleOnQueuedTasks) { logger.logWarn( "Throttled due too many queue flush tasks (probably because of slow uploading speed)," diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java index f426e898d..777ae4fdc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProvider.java @@ -9,9 +9,6 @@ public interface MemoryInfoProvider { /** @return Max memory the JVM can allocate */ long getMaxMemory(); - /** @return Total allocated JVM memory so far */ - long getTotalMemory(); - /** @return Free JVM memory */ long getFreeMemory(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java index 3a957f225..2a1adbe2a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/MemoryInfoProviderFromRuntime.java @@ -4,20 +4,38 @@ package net.snowflake.ingest.streaming.internal; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** Reads memory information from JVM runtime */ public class MemoryInfoProviderFromRuntime implements MemoryInfoProvider { - @Override - public long getMaxMemory() { - return Runtime.getRuntime().maxMemory(); + private final long maxMemory; + private volatile long totalFreeMemory; + private final ScheduledExecutorService executorService; + + public MemoryInfoProviderFromRuntime(long freeMemoryUpdateIntervalMs) { + maxMemory = Runtime.getRuntime().maxMemory(); + totalFreeMemory = Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory()); + executorService = new ScheduledThreadPoolExecutor(1, r -> { + Thread th = new Thread(r, "MemoryInfoProviderFromRuntime"); + th.setDaemon(true); + return th; + }); + executorService.scheduleAtFixedRate(this::updateFreeMemory, 0, freeMemoryUpdateIntervalMs, TimeUnit.MILLISECONDS); + } + + private void updateFreeMemory() { + totalFreeMemory = Runtime.getRuntime().freeMemory() + (maxMemory - Runtime.getRuntime().totalMemory()); } @Override - public long getTotalMemory() { - return Runtime.getRuntime().totalMemory(); + public long getMaxMemory() { + return maxMemory; } @Override public long getFreeMemory() { - return Runtime.getRuntime().freeMemory(); + return totalFreeMemory; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 58e81d116..785d26f5e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -45,6 +45,10 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // Reference to the row buffer private final RowBuffer rowBuffer; + private final long insertThrottleIntervalInMs; + private final int insertThrottleThresholdInBytes; + private final int insertThrottleThresholdInPercentage; + private final long maxMemoryLimitInBytes; // Indicates whether the channel is closed private volatile boolean isClosed; @@ -61,6 +65,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn // The latest cause of channel invalidation private String invalidationCause; + private final MemoryInfoProvider memoryInfoProvider; + private volatile long freeMemory = 0; + /** * Constructor for TESTING ONLY which allows us to set the test mode * @@ -121,6 +128,16 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn OffsetTokenVerificationFunction offsetTokenVerificationFunction) { this.isClosed = false; this.owningClient = client; + this.insertThrottleIntervalInMs = + this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs(); + this.insertThrottleThresholdInBytes = + this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes(); + this.insertThrottleThresholdInPercentage = + this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage(); + this.maxMemoryLimitInBytes = + this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes(); + + this.memoryInfoProvider = new MemoryInfoProviderFromRuntime(insertThrottleIntervalInMs); this.channelFlushContext = new ChannelFlushContext( name, dbName, schemaName, tableName, channelSequencer, encryptionKey, encryptionKeyId); @@ -373,7 +390,7 @@ public InsertValidationResponse insertRows( Iterable> rows, @Nullable String startOffsetToken, @Nullable String endOffsetToken) { - throttleInsertIfNeeded(new MemoryInfoProviderFromRuntime()); + throttleInsertIfNeeded(memoryInfoProvider); checkValidation(); if (isClosed()) { @@ -448,8 +465,6 @@ public Map getTableSchema() { /** Check whether we need to throttle the insertRows API */ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) { int retry = 0; - long insertThrottleIntervalInMs = - this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs(); while ((hasLowRuntimeMemory(memoryInfoProvider) || (this.owningClient.getFlushService() != null && this.owningClient.getFlushService().throttleDueToQueuedFlushTasks())) @@ -473,19 +488,11 @@ void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) { /** Check whether we have a low runtime memory condition */ private boolean hasLowRuntimeMemory(MemoryInfoProvider memoryInfoProvider) { - int insertThrottleThresholdInBytes = - this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes(); - int insertThrottleThresholdInPercentage = - this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage(); - long maxMemoryLimitInBytes = - this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes(); long maxMemory = maxMemoryLimitInBytes == MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT ? memoryInfoProvider.getMaxMemory() : maxMemoryLimitInBytes; - long freeMemory = - memoryInfoProvider.getFreeMemory() - + (memoryInfoProvider.getMaxMemory() - memoryInfoProvider.getTotalMemory()); + freeMemory = memoryInfoProvider.getFreeMemory(); boolean hasLowRuntimeMemory = freeMemory < insertThrottleThresholdInBytes && freeMemory * 100 / maxMemory < insertThrottleThresholdInPercentage; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 4ddc61ece..b4fa769a1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -61,11 +61,6 @@ public long getMaxMemory() { return maxMemory; } - @Override - public long getTotalMemory() { - return maxMemory; - } - @Override public long getFreeMemory() { return freeMemory;