diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 7e4c3d042..5c6f81f66 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** Utility class to provide configurable constants */ public class ParameterProvider { @@ -31,6 +32,10 @@ public class ParameterProvider { public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); + public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); + + public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; @@ -45,6 +50,14 @@ public class ParameterProvider { public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L; public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L; public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L; + + // Lag related parameters + public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; + public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; + + static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); + + static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. @@ -54,6 +67,9 @@ public class ParameterProvider { /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); + // Cached buffer flush interval - avoid parsing each time for quick lookup + private Long cachedBufferFlushIntervalMs = -1L; + /** * Constructor. Takes properties from profile file and properties from client constructor and * resolves final parameter value @@ -81,6 +97,7 @@ private void updateValue( this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); } } + /** * Sets parameter values by first checking 1. parameterOverrides 2. props 3. default value * @@ -149,17 +166,80 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue( MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + + this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); + this.updateValue( + MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); } /** @return Longest interval in milliseconds between buffer flushes */ public long getBufferFlushIntervalInMs() { - Object val = - this.parameterMap.getOrDefault( - BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); - if (val instanceof String) { - return Long.parseLong(val.toString()); + if (getMaxClientLagEnabled()) { + if (cachedBufferFlushIntervalMs != -1L) { + return cachedBufferFlushIntervalMs; + } + long lag = getMaxClientLagMs(); + if (cachedBufferFlushIntervalMs == -1L) { + cachedBufferFlushIntervalMs = lag; + } + return cachedBufferFlushIntervalMs; + } else { + Object val = + this.parameterMap.getOrDefault( + BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); + return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - return (long) val; + } + + private long getMaxClientLagMs() { + Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); + if (!(val instanceof String)) { + return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; + } + String maxLag = (String) val; + String[] lagParts = maxLag.split(" "); + if (lagParts.length != 2 + || (lagParts[0] == null || "".equals(lagParts[0])) + || (lagParts[1] == null || "".equals(lagParts[1]))) { + throw new IllegalArgumentException( + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag)); + } + long lag; + try { + lag = Long.parseLong(lagParts[0]); + } catch (Throwable t) { + throw new IllegalArgumentException( + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); + } + long computedLag; + switch (lagParts[1].toLowerCase()) { + case "second": + case "seconds": + computedLag = lag * TimeUnit.SECONDS.toMillis(1); + break; + case "minute": + case "minutes": + computedLag = lag * TimeUnit.SECONDS.toMillis(60); + break; + default: + throw new IllegalArgumentException( + String.format("Invalid time unit supplied = '%s", lagParts[1])); + } + + if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) { + throw new IllegalArgumentException( + String.format( + "Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum" + + " (milliseconds) = %s", + MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX)); + } + return computedLag; + } + + private boolean getMaxClientLagEnabled() { + Object val = + this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT); + return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } /** @return Time in milliseconds between checks to see if the buffer should be flushed */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index dcf4037c6..def5f7ecf 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -9,9 +9,7 @@ public class ParameterProviderTest { - @Test - public void withValuesSet() { - Properties prop = new Properties(); + private Map getStartingParameterMap() { Map parameterMap = new HashMap<>(); parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); @@ -22,6 +20,14 @@ public void withValuesSet() { parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100); parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L); parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L); + return parameterMap; + } + + @Test + public void withValuesSet() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3L, parameterProvider.getBufferFlushIntervalInMs()); @@ -42,6 +48,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); @@ -60,6 +67,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); + props.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(null, props); Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); @@ -123,4 +131,145 @@ public void withDefaultValues() { ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterProvider.getMaxChannelSizeInBytes()); } + + @Test + public void testMaxClientLagEnabled() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + // call again to trigger caching logic + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledPluralTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMinuteTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(60000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(120000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledDefaultValue() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMissingUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } + } + + @Test + public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } + } + + @Test + public void testMaxClientLagEnabledInvalidTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Invalid time unit")); + } + } + + @Test + public void testMaxClientLagEnabledInvalidUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } + } + + @Test + public void testMaxClientLagEnabledThresholdBelow() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } + + @Test + public void testMaxClientLagEnabledThresholdAbove() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } }