diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 3b020d84e..8fa43f161 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -53,6 +53,11 @@ public class ParameterProvider { // Lag related parameters public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; + + static final long MAX_CLIENT_LAG_MS_MIN = 1000; + + // 10 minutes + static final long MAX_CLIENT_LAG_MS_MAX = 600000; public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. @@ -92,6 +97,7 @@ private void updateValue( this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); } } + /** * Sets parameter values by first checking 1. parameterOverrides 2. props 3. default value * @@ -205,17 +211,29 @@ private long getMaxClientLagMs() { throw new IllegalArgumentException( String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); } + long computedLag = BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; switch (lagParts[1].toLowerCase()) { case "second": case "seconds": - return lag * 1000; + computedLag = lag * 1000; + break; case "minute": case "minutes": - return lag * 60000; + computedLag = lag * 60000; + break; default: throw new IllegalArgumentException( String.format("Invalid time unit supplied = '%s", lagParts[1])); } + + if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) { + throw new IllegalArgumentException( + String.format( + "Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum" + + " (milliseconds) = %s", + MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX)); + } + return computedLag; } private boolean getMaxClientLagEnabled() { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 40ef1fa2d..def5f7ecf 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -242,4 +242,34 @@ public void testMaxClientLagEnabledInvalidUnit() { Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); } } + + @Test + public void testMaxClientLagEnabledThresholdBelow() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } + + @Test + public void testMaxClientLagEnabledThresholdAbove() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } }