Skip to content

Commit

Permalink
fix logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Jan 3, 2024
1 parent efbd0e5 commit 7d2d975
Showing 1 changed file with 50 additions and 24 deletions.
74 changes: 50 additions & 24 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
props);
}

/** @return Longest interval in milliseconds between buffer flushes */
/**
* @return Longest interval in milliseconds between buffer flushes
*/
public long getBufferFlushIntervalInMs() {
if (getMaxClientLagEnabled()) {
if (cachedBufferFlushIntervalMs != -1L) {
Expand All @@ -203,18 +205,18 @@ public long getBufferFlushIntervalInMs() {
cachedBufferFlushIntervalMs = lag;
}
return cachedBufferFlushIntervalMs;
} else {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

private long getMaxClientLagMs() {
Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT);
if (!(val instanceof String)) {
return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT;
return getBufferFlushIntervalInMs();
}
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
Expand Down Expand Up @@ -249,9 +251,9 @@ private long getMaxClientLagMs() {
if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) {
throw new IllegalArgumentException(
String.format(
"Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum"
+ " (milliseconds) = %s",
MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX));
"Lag falls outside of allowed time range. Minimum (seconds) = %s, Maximum"
+ " (seconds) = %s",
MAX_CLIENT_LAG_MS_MIN / 1000, MAX_CLIENT_LAG_MS_MAX / 1000));
}
return computedLag;
}
Expand All @@ -262,7 +264,9 @@ private boolean getMaxClientLagEnabled() {
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/** @return Time in milliseconds between checks to see if the buffer should be flushed */
/**
* @return Time in milliseconds between checks to see if the buffer should be flushed
*/
public long getBufferFlushCheckIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -273,7 +277,9 @@ public long getBufferFlushCheckIntervalInMs() {
return (long) val;
}

/** @return Duration in milliseconds to delay data insertion to the buffer when throttled */
/**
* @return Duration in milliseconds to delay data insertion to the buffer when throttled
*/
public long getInsertThrottleIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -284,7 +290,9 @@ public long getInsertThrottleIntervalInMs() {
return (long) val;
}

/** @return Percent of free total memory at which we throttle row inserts */
/**
* @return Percent of free total memory at which we throttle row inserts
*/
public int getInsertThrottleThresholdInPercentage() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -296,7 +304,9 @@ public int getInsertThrottleThresholdInPercentage() {
return (int) val;
}

/** @return Absolute size in bytes of free total memory at which we throttle row inserts */
/**
* @return Absolute size in bytes of free total memory at which we throttle row inserts
*/
public int getInsertThrottleThresholdInBytes() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -307,7 +317,9 @@ public int getInsertThrottleThresholdInBytes() {
return (int) val;
}

/** @return true if jmx metrics are enabled for a client */
/**
* @return true if jmx metrics are enabled for a client
*/
public boolean hasEnabledSnowpipeStreamingMetrics() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -318,7 +330,9 @@ public boolean hasEnabledSnowpipeStreamingMetrics() {
return (boolean) val;
}

/** @return Blob format version */
/**
* @return Blob format version
*/
public Constants.BdecVersion getBlobFormatVersion() {
Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT);
if (val instanceof Constants.BdecVersion) {
Expand Down Expand Up @@ -347,7 +361,9 @@ public int getIOTimeCpuRatio() {
return (int) val;
}

/** @return the max retry count when waiting for a blob upload task to finish */
/**
* @return the max retry count when waiting for a blob upload task to finish
*/
public int getBlobUploadMaxRetryCount() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -358,31 +374,39 @@ public int getBlobUploadMaxRetryCount() {
return (int) val;
}

/** @return The max memory limit in bytes */
/**
* @return The max memory limit in bytes
*/
public long getMaxMemoryLimitInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return Return whether memory optimization for Parquet is enabled. */
/**
* @return Return whether memory optimization for Parquet is enabled.
*/
public boolean getEnableParquetInternalBuffering() {
Object val =
this.parameterMap.getOrDefault(
ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/** @return The max channel size in bytes */
/**
* @return The max channel size in bytes
*/
public long getMaxChannelSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max chunk size in bytes that could avoid OOM at server side */
/**
* @return The max chunk size in bytes that could avoid OOM at server side
*/
public long getMaxChunkSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT);
Expand All @@ -408,11 +432,13 @@ public int getMaxChunksInBlobAndRegistrationRequest() {
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/** @return BDEC compression algorithm */
/**
* @return BDEC compression algorithm
*/
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
if (val instanceof Constants.BdecParquetCompression) {
return (Constants.BdecParquetCompression) val;
}
Expand Down

0 comments on commit 7d2d975

Please sign in to comment.