Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-914666 Adds MAX_CLIENT_LAG configuration option #586

Merged
merged 5 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 86 additions & 6 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public class ParameterProvider {
public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES =
"MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase();

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
Expand All @@ -45,6 +49,15 @@ public class ParameterProvider {
public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L;
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;

// Lag related parameters
public static final String MAX_CLIENT_LAG_DEFAULT = "1 second";
public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true;
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved

static final long MAX_CLIENT_LAG_MS_MIN = 1000;

// 10 minutes
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
static final long MAX_CLIENT_LAG_MS_MAX = 600000;
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
Expand All @@ -54,6 +67,9 @@ public class ParameterProvider {
/** Map of parameter name to parameter value. This will be set by client/configure API Call. */
private final Map<String, Object> parameterMap = new HashMap<>();

// Cached buffer flush interval - avoid parsing each time for quick lookup
private Long cachedBufferFlushIntervalMs = -1L;

/**
* Constructor. Takes properties from profile file and properties from client constructor and
* resolves final parameter value
Expand Down Expand Up @@ -81,6 +97,7 @@ private void updateValue(
this.parameterMap.put(key, props.getOrDefault(key, defaultValue));
}
}

/**
* Sets parameter values by first checking 1. parameterOverrides 2. props 3. default value
*
Expand Down Expand Up @@ -149,17 +166,80 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties

this.updateValue(
MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props);

this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);
}

/** @return Longest interval in milliseconds between buffer flushes */
public long getBufferFlushIntervalInMs() {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
if (val instanceof String) {
return Long.parseLong(val.toString());
if (getMaxClientLagEnabled()) {
if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
}
long lag = getMaxClientLagMs();
if (cachedBufferFlushIntervalMs == -1L) {
cachedBufferFlushIntervalMs = lag;
}
return cachedBufferFlushIntervalMs;
} else {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}
return (long) val;
}

private long getMaxClientLagMs() {
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT);
if (!(val instanceof String)) {
return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT;
}
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length != 2
|| (lagParts[0] == null || "".equals(lagParts[0]))
|| (lagParts[1] == null || "".equals(lagParts[1]))) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag));
}
long lag;
try {
lag = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}
long computedLag = BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT;
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
computedLag = lag * 1000;
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
break;
case "minute":
case "minutes":
computedLag = lag * 60000;
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
}

if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) {
throw new IllegalArgumentException(
String.format(
"Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum"
+ " (milliseconds) = %s",
MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX));
}
return computedLag;
}

private boolean getMaxClientLagEnabled() {
Object val =
this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/** @return Time in milliseconds between checks to see if the buffer should be flushed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

public class ParameterProviderTest {

@Test
public void withValuesSet() {
Properties prop = new Properties();
private Map<String, Object> getStartingParameterMap() {
Map<String, Object> parameterMap = new HashMap<>();
parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L);
parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
Expand All @@ -22,6 +20,14 @@ public void withValuesSet() {
parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100);
parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L);
parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L);
return parameterMap;
}

@Test
public void withValuesSet() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);

Assert.assertEquals(3L, parameterProvider.getBufferFlushIntervalInMs());
Expand All @@ -42,6 +48,7 @@ public void withNullProps() {
parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6);
parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null);

Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs());
Expand All @@ -60,6 +67,7 @@ public void withNullParameterMap() {
props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L);
props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6);
props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024);
props.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false);
ParameterProvider parameterProvider = new ParameterProvider(null, props);

Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs());
Expand Down Expand Up @@ -123,4 +131,145 @@ public void withDefaultValues() {
ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
parameterProvider.getMaxChannelSizeInBytes());
}

@Test
public void testMaxClientLagEnabled() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
// call again to trigger caching logic
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledPluralTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMinuteTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(60000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(120000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledDefaultValue() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs());
}

@Test
public void testMaxClientLagEnabledMissingUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Failed to parse"));
}
}

@Test
public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Failed to parse"));
}
}

@Test
public void testMaxClientLagEnabledInvalidTimeUnit() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Invalid time unit"));
}
}

@Test
public void testMaxClientLagEnabledInvalidUnit() {
sfc-gh-tjones marked this conversation as resolved.
Show resolved Hide resolved
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Failed to parse"));
}
}

@Test
public void testMaxClientLagEnabledThresholdBelow() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Lag falls outside"));
}
}

@Test
public void testMaxClientLagEnabledThresholdAbove() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Lag falls outside"));
}
}
}