Skip to content

Commit

Permalink
update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Jan 4, 2024
1 parent 7d2d975 commit c120ef0
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}

public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
return this;
}

public OpenChannelRequest build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ CompletableFuture<Void> flush(boolean isForce) {
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getBufferFlushIntervalInMs()))) {
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {

return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
payload.put("schema", request.getSchemaName());
payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
payload.put("role", this.role);
if (request.isOffsetTokenProvided()){
if (request.isOffsetTokenProvided()) {
payload.put("offset_token", request.getOffsetToken());
}

Expand Down
172 changes: 66 additions & 106 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ public class ParameterProvider {

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final int INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT = 10;
Expand All @@ -57,12 +54,10 @@ public class ParameterProvider {
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;

// Lag related parameters
public static final String MAX_CLIENT_LAG_DEFAULT = "1 second";
public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true;

public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second
static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1);

static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10);

public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100;

Expand Down Expand Up @@ -114,12 +109,6 @@ private void updateValue(
* @param props Properties file provided to client constructor
*/
private void setParameterMap(Map<String, Object> parameterOverrides, Properties props) {
this.updateValue(
BUFFER_FLUSH_INTERVAL_IN_MILLIS,
BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT,
parameterOverrides,
props);

this.updateValue(
BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS,
BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT,
Expand Down Expand Up @@ -177,8 +166,7 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props);

this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);

this.updateValue(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
Expand All @@ -192,81 +180,75 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
props);
}

/**
* @return Longest interval in milliseconds between buffer flushes
*/
public long getBufferFlushIntervalInMs() {
if (getMaxClientLagEnabled()) {
if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
}
long lag = getMaxClientLagMs();
if (cachedBufferFlushIntervalMs == -1L) {
cachedBufferFlushIntervalMs = lag;
}
/** @return Longest interval in milliseconds between buffer flushes */
public long getCachedMaxClientLagInMs() {
// BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed
if (this.parameterMap.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) {
throw new IllegalArgumentException(
String.format(
"%s is deprecated, please use %s instead",
BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG));
}

if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
}

Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
cachedBufferFlushIntervalMs = getMaxClientLagInMs();
return cachedBufferFlushIntervalMs;
}

private long getMaxClientLagMs() {
private long getMaxClientLagInMs() {
Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT);
if (!(val instanceof String)) {
return getBufferFlushIntervalInMs();
}
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length != 2
|| (lagParts[0] == null || "".equals(lagParts[0]))
|| (lagParts[1] == null || "".equals(lagParts[1]))) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag));
}
long lag;
try {
lag = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}
long computedLag;
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
computedLag = lag * TimeUnit.SECONDS.toMillis(1);
break;
case "minute":
case "minutes":
computedLag = lag * TimeUnit.SECONDS.toMillis(60);
break;
default:
if (val instanceof String) {
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length > 2) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag));
}

// Compute the actual value
try {
computedLag = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}

// Compute the time unit if needed
if (lagParts.length == 2) {
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
computedLag = computedLag * TimeUnit.SECONDS.toMillis(1);
break;
case "minute":
case "minutes":
computedLag = computedLag * TimeUnit.SECONDS.toMillis(60);
break;
default:
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
}
}
} else {
computedLag = (long) val;
}

if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) {
throw new IllegalArgumentException(
String.format(
"Lag falls outside of allowed time range. Minimum (seconds) = %s, Maximum"
+ " (seconds) = %s",
MAX_CLIENT_LAG_MS_MIN / 1000, MAX_CLIENT_LAG_MS_MAX / 1000));
"Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum"
+ " (milliseconds) = %s",
MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX));
}
return computedLag;
}

private boolean getMaxClientLagEnabled() {
Object val =
this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
return computedLag;
}

/**
* @return Time in milliseconds between checks to see if the buffer should be flushed
*/
/** @return Time in milliseconds between checks to see if the buffer should be flushed */
public long getBufferFlushCheckIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -277,9 +259,7 @@ public long getBufferFlushCheckIntervalInMs() {
return (long) val;
}

/**
* @return Duration in milliseconds to delay data insertion to the buffer when throttled
*/
/** @return Duration in milliseconds to delay data insertion to the buffer when throttled */
public long getInsertThrottleIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -290,9 +270,7 @@ public long getInsertThrottleIntervalInMs() {
return (long) val;
}

/**
* @return Percent of free total memory at which we throttle row inserts
*/
/** @return Percent of free total memory at which we throttle row inserts */
public int getInsertThrottleThresholdInPercentage() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -304,9 +282,7 @@ public int getInsertThrottleThresholdInPercentage() {
return (int) val;
}

/**
* @return Absolute size in bytes of free total memory at which we throttle row inserts
*/
/** @return Absolute size in bytes of free total memory at which we throttle row inserts */
public int getInsertThrottleThresholdInBytes() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -317,9 +293,7 @@ public int getInsertThrottleThresholdInBytes() {
return (int) val;
}

/**
* @return true if jmx metrics are enabled for a client
*/
/** @return true if jmx metrics are enabled for a client */
public boolean hasEnabledSnowpipeStreamingMetrics() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -330,9 +304,7 @@ public boolean hasEnabledSnowpipeStreamingMetrics() {
return (boolean) val;
}

/**
* @return Blob format version
*/
/** @return Blob format version */
public Constants.BdecVersion getBlobFormatVersion() {
Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT);
if (val instanceof Constants.BdecVersion) {
Expand Down Expand Up @@ -361,9 +333,7 @@ public int getIOTimeCpuRatio() {
return (int) val;
}

/**
* @return the max retry count when waiting for a blob upload task to finish
*/
/** @return the max retry count when waiting for a blob upload task to finish */
public int getBlobUploadMaxRetryCount() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -374,39 +344,31 @@ public int getBlobUploadMaxRetryCount() {
return (int) val;
}

/**
* @return The max memory limit in bytes
*/
/** @return The max memory limit in bytes */
public long getMaxMemoryLimitInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/**
* @return Return whether memory optimization for Parquet is enabled.
*/
/** @return Return whether memory optimization for Parquet is enabled. */
public boolean getEnableParquetInternalBuffering() {
Object val =
this.parameterMap.getOrDefault(
ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/**
* @return The max channel size in bytes
*/
/** @return The max channel size in bytes */
public long getMaxChannelSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/**
* @return The max chunk size in bytes that could avoid OOM at server side
*/
/** @return The max chunk size in bytes that could avoid OOM at server side */
public long getMaxChunkSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT);
Expand All @@ -432,9 +394,7 @@ public int getMaxChunksInBlobAndRegistrationRequest() {
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/**
* @return BDEC compression algorithm
*/
/** @return BDEC compression algorithm */
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/net/snowflake/ingest/SimpleIngestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ public void testSimpleIngestWithPattern() throws Exception {
}

private void getHistoryAndAssertLoad(SimpleIngestManager manager, String test_file_name_2)
throws InterruptedException,
java.util.concurrent.ExecutionException,
throws InterruptedException, java.util.concurrent.ExecutionException,
java.util.concurrent.TimeoutException {
// keeps track of whether we've loaded the file
boolean loaded = false;
Expand Down
Loading

0 comments on commit c120ef0

Please sign in to comment.