From 7d2d9754afaaad3458d239205508cb4342c693a3 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Wed, 3 Jan 2024 21:33:57 +0000 Subject: [PATCH] fix logic --- .../ingest/utils/ParameterProvider.java | 74 +++++++++++++------ 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 82dd1164b..c77039886 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -192,7 +192,9 @@ private void setParameterMap(Map parameterOverrides, Properties props); } - /** @return Longest interval in milliseconds between buffer flushes */ + /** + * @return Longest interval in milliseconds between buffer flushes + */ public long getBufferFlushIntervalInMs() { if (getMaxClientLagEnabled()) { if (cachedBufferFlushIntervalMs != -1L) { @@ -203,18 +205,18 @@ public long getBufferFlushIntervalInMs() { cachedBufferFlushIntervalMs = lag; } return cachedBufferFlushIntervalMs; - } else { - Object val = - this.parameterMap.getOrDefault( - BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); - return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + + Object val = + this.parameterMap.getOrDefault( + BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); + return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } private long getMaxClientLagMs() { Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); if (!(val instanceof String)) { - return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; + return getBufferFlushIntervalInMs(); } String maxLag = (String) val; String[] lagParts = maxLag.split(" "); @@ -249,9 +251,9 @@ private long getMaxClientLagMs() { if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) { throw new IllegalArgumentException( String.format( - "Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum" - + " (milliseconds) = %s", - MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX)); + "Lag falls outside of allowed time range. Minimum (seconds) = %s, Maximum" + + " (seconds) = %s", + MAX_CLIENT_LAG_MS_MIN / 1000, MAX_CLIENT_LAG_MS_MAX / 1000)); } return computedLag; } @@ -262,7 +264,9 @@ private boolean getMaxClientLagEnabled() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** @return Time in milliseconds between checks to see if the buffer should be flushed */ + /** + * @return Time in milliseconds between checks to see if the buffer should be flushed + */ public long getBufferFlushCheckIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -273,7 +277,9 @@ public long getBufferFlushCheckIntervalInMs() { return (long) val; } - /** @return Duration in milliseconds to delay data insertion to the buffer when throttled */ + /** + * @return Duration in milliseconds to delay data insertion to the buffer when throttled + */ public long getInsertThrottleIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -284,7 +290,9 @@ public long getInsertThrottleIntervalInMs() { return (long) val; } - /** @return Percent of free total memory at which we throttle row inserts */ + /** + * @return Percent of free total memory at which we throttle row inserts + */ public int getInsertThrottleThresholdInPercentage() { Object val = this.parameterMap.getOrDefault( @@ -296,7 +304,9 @@ public int getInsertThrottleThresholdInPercentage() { return (int) val; } - /** @return Absolute size in bytes of free total memory at which we throttle row inserts */ + /** + * @return Absolute size in bytes of free total memory at which we throttle row inserts + */ public int getInsertThrottleThresholdInBytes() { Object val = this.parameterMap.getOrDefault( @@ -307,7 +317,9 @@ public int getInsertThrottleThresholdInBytes() { return (int) val; } - /** @return true if jmx metrics are enabled for a client */ + /** + * @return true if jmx metrics are enabled for a client + */ public boolean hasEnabledSnowpipeStreamingMetrics() { Object val = this.parameterMap.getOrDefault( @@ -318,7 +330,9 @@ public boolean hasEnabledSnowpipeStreamingMetrics() { return (boolean) val; } - /** @return Blob format version */ + /** + * @return Blob format version + */ public Constants.BdecVersion getBlobFormatVersion() { Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT); if (val instanceof Constants.BdecVersion) { @@ -347,7 +361,9 @@ public int getIOTimeCpuRatio() { return (int) val; } - /** @return the max retry count when waiting for a blob upload task to finish */ + /** + * @return the max retry count when waiting for a blob upload task to finish + */ public int getBlobUploadMaxRetryCount() { Object val = this.parameterMap.getOrDefault( @@ -358,7 +374,9 @@ public int getBlobUploadMaxRetryCount() { return (int) val; } - /** @return The max memory limit in bytes */ + /** + * @return The max memory limit in bytes + */ public long getMaxMemoryLimitInBytes() { Object val = this.parameterMap.getOrDefault( @@ -366,7 +384,9 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return Return whether memory optimization for Parquet is enabled. */ + /** + * @return Return whether memory optimization for Parquet is enabled. + */ public boolean getEnableParquetInternalBuffering() { Object val = this.parameterMap.getOrDefault( @@ -374,7 +394,9 @@ public boolean getEnableParquetInternalBuffering() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** @return The max channel size in bytes */ + /** + * @return The max channel size in bytes + */ public long getMaxChannelSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -382,7 +404,9 @@ public long getMaxChannelSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return The max chunk size in bytes that could avoid OOM at server side */ + /** + * @return The max chunk size in bytes that could avoid OOM at server side + */ public long getMaxChunkSizeInBytes() { Object val = this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT); @@ -408,11 +432,13 @@ public int getMaxChunksInBlobAndRegistrationRequest() { return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } - /** @return BDEC compression algorithm */ + /** + * @return BDEC compression algorithm + */ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { Object val = - this.parameterMap.getOrDefault( - BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); + this.parameterMap.getOrDefault( + BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); if (val instanceof Constants.BdecParquetCompression) { return (Constants.BdecParquetCompression) val; }