Skip to content

Commit

Permalink
SNOW-914666 Adds MAX_CLIENT_LAG configuration option
Browse files Browse the repository at this point in the history
We want to expose a knob that gives users the ability to control when
data is ingested. This has a material difference on the size of blobs
generated and can result in fewer smaller-sized blobs which in turn
affects query performance. The trade-off is higher ingest latencies. We
have decided to expose this in the form of an optional `MAX_CLIENT_LAG`
option that accepts inputs as the following:

- `number second` (ex: `1 second`)
- `number seconds` (ex: `2 seconds`)
- `number minute` (ex: `1 minute`)
- `number minutes` (ex: `2 minutes`)

By default we use 1 second as the maximum client lag which is the
current behavior of the SDK.

Note that this dictates when a flush is triggered to cloud storage.
Depending on your connection to cloud storage and cloud storage tail
latencies a blob persist may take longer than expected. Therefore, it is
helpful to think of this parameter as a target, rather than an absolute
number.

@test Adds tests to `ParameterProviderTest`
  • Loading branch information
sfc-gh-tjones committed Sep 13, 2023
1 parent 3a3cbc8 commit c348607
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 21 deletions.
121 changes: 103 additions & 18 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class ParameterProvider {
public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES =
"MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase();

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
Expand All @@ -45,6 +49,10 @@ public class ParameterProvider {
public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L;
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;

// Lag related parameters
public static final String MAX_CLIENT_LAG_DEFAULT = "1 second";
public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true;
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
Expand All @@ -54,6 +62,9 @@ public class ParameterProvider {
/** Map of parameter name to parameter value. This will be set by client/configure API Call. */
private final Map<String, Object> parameterMap = new HashMap<>();

// Cached buffer flush interval - avoid parsing each time for quick lookup
private Long cachedBufferFlushIntervalMs = -1L;

/**
* Constructor. Takes properties from profile file and properties from client constructor and
* resolves final parameter value
Expand Down Expand Up @@ -149,20 +160,74 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties

this.updateValue(
MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props);

this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);
}

/** @return Longest interval in milliseconds between buffer flushes */
/**
* @return Longest interval in milliseconds between buffer flushes
*/
public long getBufferFlushIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
if (val instanceof String) {
return Long.parseLong(val.toString());
if (getMaxClientLagEnabled()) {
if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
}
long lag = getMaxClientLagMs();
if (cachedBufferFlushIntervalMs == -1L) {
cachedBufferFlushIntervalMs = lag;
}
return cachedBufferFlushIntervalMs;
} else {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}
}

private long getMaxClientLagMs() {
Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT);
if (!(val instanceof String)) {
return 1000;
}
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length != 2
|| (lagParts[0] == null || "".equals(lagParts[0]))
|| (lagParts[1] == null || "".equals(lagParts[1]))) {
return 1000;
}
long unit;
try {
unit = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
return unit * 1000;
case "minute":
case "minutes":
return unit * 60000;
default:
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
}
return (long) val;
}

/** @return Time in milliseconds between checks to see if the buffer should be flushed */
private boolean getMaxClientLagEnabled() {
Object val =
this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT);
return (val instanceof Boolean) ? (Boolean) val : false;
}

/**
* @return Time in milliseconds between checks to see if the buffer should be flushed
*/
public long getBufferFlushCheckIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -173,7 +238,9 @@ public long getBufferFlushCheckIntervalInMs() {
return (long) val;
}

/** @return Duration in milliseconds to delay data insertion to the buffer when throttled */
/**
* @return Duration in milliseconds to delay data insertion to the buffer when throttled
*/
public long getInsertThrottleIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -184,7 +251,9 @@ public long getInsertThrottleIntervalInMs() {
return (long) val;
}

/** @return Percent of free total memory at which we throttle row inserts */
/**
* @return Percent of free total memory at which we throttle row inserts
*/
public int getInsertThrottleThresholdInPercentage() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -196,7 +265,9 @@ public int getInsertThrottleThresholdInPercentage() {
return (int) val;
}

/** @return Absolute size in bytes of free total memory at which we throttle row inserts */
/**
* @return Absolute size in bytes of free total memory at which we throttle row inserts
*/
public int getInsertThrottleThresholdInBytes() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -207,7 +278,9 @@ public int getInsertThrottleThresholdInBytes() {
return (int) val;
}

/** @return true if jmx metrics are enabled for a client */
/**
* @return true if jmx metrics are enabled for a client
*/
public boolean hasEnabledSnowpipeStreamingMetrics() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -218,7 +291,9 @@ public boolean hasEnabledSnowpipeStreamingMetrics() {
return (boolean) val;
}

/** @return Blob format version */
/**
* @return Blob format version
*/
public Constants.BdecVersion getBlobFormatVersion() {
Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT);
if (val instanceof Constants.BdecVersion) {
Expand Down Expand Up @@ -247,7 +322,9 @@ public int getIOTimeCpuRatio() {
return (int) val;
}

/** @return the max retry count when waiting for a blob upload task to finish */
/**
* @return the max retry count when waiting for a blob upload task to finish
*/
public int getBlobUploadMaxRetryCount() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -258,31 +335,39 @@ public int getBlobUploadMaxRetryCount() {
return (int) val;
}

/** @return The max memory limit in bytes */
/**
* @return The max memory limit in bytes
*/
public long getMaxMemoryLimitInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return Return whether memory optimization for Parquet is enabled. */
/**
* @return Return whether memory optimization for Parquet is enabled.
*/
public boolean getEnableParquetInternalBuffering() {
Object val =
this.parameterMap.getOrDefault(
ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/** @return The max channel size in bytes */
/**
* @return The max channel size in bytes
*/
public long getMaxChannelSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max chunk size in bytes that could avoid OOM at server side */
/**
* @return The max chunk size in bytes that could avoid OOM at server side
*/
public long getMaxChunkSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@
import java.util.Properties;
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ParameterProviderTest {

@Test
public void withValuesSet() {
Properties prop = new Properties();
private Map<String, Object> getStartingParameterMap() {
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L);
parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
Expand All @@ -22,6 +21,14 @@ public void withValuesSet() {
parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100);
parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L);
parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L);
return parameterMap;
}

@Test
public void withValuesSet() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);

Assert.assertEquals(3L, parameterProvider.getBufferFlushIntervalInMs());
Expand All @@ -42,6 +49,7 @@ public void withNullProps() {
parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6);
parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null);

Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs());
Expand All @@ -60,6 +68,7 @@ public void withNullParameterMap() {
props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6);
props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024);
props.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(null, props);

Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs());
Expand Down Expand Up @@ -123,4 +132,104 @@ public void withDefaultValues() {
ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
parameterProvider.getMaxChannelSizeInBytes());
}

@Test
public void testMaxClientLagEnabled() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
// call again to trigger caching logic
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledPluralTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
}
@Test
public void testMaxClientLagEnabledMinuteTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(60000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(120000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledDefaultValue() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMissingUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledInvalidTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Invalid time unit"));
}
}

@Test
public void testMaxClientLagEnabledInvalidUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Failed to parse"));
}
}
}

0 comments on commit c348607

Please sign in to comment.