From c3486072937fb15f089f6a3c9dd9ee8873a0807d Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 13 Sep 2023 13:37:55 -0700 Subject: [PATCH 1/5] SNOW-914666 Adds MAX_CLIENT_LAG configuration option We want to expose a knob that gives users the ability to control when data is ingested. This has a material difference on the size of blobs generated and can result in fewer smaller-sized blobs which in turn affects query performance. The trade-off is higher ingest latencies. We have decided to expose this in the form of an optional `MAX_CLIENT_LAG` option that accepts inputs as the following: - `number second` (ex: `1 second`) - `number seconds` (ex: `2 seconds`) - `number minute` (ex: `1 minute`) - `number minutes` (ex: `2 minutes`) By default we use 1 second as the maximum client lag which is the current behavior of the SDK. Note that this dictates when a flush is triggered to cloud storage. Depending on your connection to cloud storage and cloud storage tail latencies a blob persist may take longer than expected. Therefore, it is helpful to think of this parameter as a target, rather than an absolute number. @test Adds tests to `ParameterProviderTest` --- .../ingest/utils/ParameterProvider.java | 121 +++++++++++++++--- .../internal/ParameterProviderTest.java | 115 ++++++++++++++++- 2 files changed, 215 insertions(+), 21 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 7e4c3d042..c076d6d6d 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -31,6 +31,10 @@ public class ParameterProvider { public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); + public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); + + public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; @@ -45,6 +49,10 @@ public class ParameterProvider { public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L; public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L; public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L; + + // Lag related parameters + public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; + public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. @@ -54,6 +62,9 @@ public class ParameterProvider { /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); + // Cached buffer flush interval - avoid parsing each time for quick lookup + private Long cachedBufferFlushIntervalMs = -1L; + /** * Constructor. Takes properties from profile file and properties from client constructor and * resolves final parameter value @@ -149,20 +160,74 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue( MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + + this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); + this.updateValue( + MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); } - /** @return Longest interval in milliseconds between buffer flushes */ + /** + * @return Longest interval in milliseconds between buffer flushes + */ public long getBufferFlushIntervalInMs() { - Object val = - this.parameterMap.getOrDefault( - BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); - if (val instanceof String) { - return Long.parseLong(val.toString()); + if (getMaxClientLagEnabled()) { + if (cachedBufferFlushIntervalMs != -1L) { + return cachedBufferFlushIntervalMs; + } + long lag = getMaxClientLagMs(); + if (cachedBufferFlushIntervalMs == -1L) { + cachedBufferFlushIntervalMs = lag; + } + return cachedBufferFlushIntervalMs; + } else { + Object val = + this.parameterMap.getOrDefault( + BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); + return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; + } + } + + private long getMaxClientLagMs() { + Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); + if (!(val instanceof String)) { + return 1000; + } + String maxLag = (String) val; + String[] lagParts = maxLag.split(" "); + if (lagParts.length != 2 + || (lagParts[0] == null || "".equals(lagParts[0])) + || (lagParts[1] == null || "".equals(lagParts[1]))) { + return 1000; + } + long unit; + try { + unit = Long.parseLong(lagParts[0]); + } catch (Throwable t) { + throw new IllegalArgumentException( + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); + } + switch (lagParts[1].toLowerCase()) { + case "second": + case "seconds": + return unit * 1000; + case "minute": + case "minutes": + return unit * 60000; + default: + throw new IllegalArgumentException( + String.format("Invalid time unit supplied = '%s", lagParts[1])); } - return (long) val; } - /** @return Time in milliseconds between checks to see if the buffer should be flushed */ + private boolean getMaxClientLagEnabled() { + Object val = + this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT); + return (val instanceof Boolean) ? (Boolean) val : false; + } + + /** + * @return Time in milliseconds between checks to see if the buffer should be flushed + */ public long getBufferFlushCheckIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -173,7 +238,9 @@ public long getBufferFlushCheckIntervalInMs() { return (long) val; } - /** @return Duration in milliseconds to delay data insertion to the buffer when throttled */ + /** + * @return Duration in milliseconds to delay data insertion to the buffer when throttled + */ public long getInsertThrottleIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -184,7 +251,9 @@ public long getInsertThrottleIntervalInMs() { return (long) val; } - /** @return Percent of free total memory at which we throttle row inserts */ + /** + * @return Percent of free total memory at which we throttle row inserts + */ public int getInsertThrottleThresholdInPercentage() { Object val = this.parameterMap.getOrDefault( @@ -196,7 +265,9 @@ public int getInsertThrottleThresholdInPercentage() { return (int) val; } - /** @return Absolute size in bytes of free total memory at which we throttle row inserts */ + /** + * @return Absolute size in bytes of free total memory at which we throttle row inserts + */ public int getInsertThrottleThresholdInBytes() { Object val = this.parameterMap.getOrDefault( @@ -207,7 +278,9 @@ public int getInsertThrottleThresholdInBytes() { return (int) val; } - /** @return true if jmx metrics are enabled for a client */ + /** + * @return true if jmx metrics are enabled for a client + */ public boolean hasEnabledSnowpipeStreamingMetrics() { Object val = this.parameterMap.getOrDefault( @@ -218,7 +291,9 @@ public boolean hasEnabledSnowpipeStreamingMetrics() { return (boolean) val; } - /** @return Blob format version */ + /** + * @return Blob format version + */ public Constants.BdecVersion getBlobFormatVersion() { Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT); if (val instanceof Constants.BdecVersion) { @@ -247,7 +322,9 @@ public int getIOTimeCpuRatio() { return (int) val; } - /** @return the max retry count when waiting for a blob upload task to finish */ + /** + * @return the max retry count when waiting for a blob upload task to finish + */ public int getBlobUploadMaxRetryCount() { Object val = this.parameterMap.getOrDefault( @@ -258,7 +335,9 @@ public int getBlobUploadMaxRetryCount() { return (int) val; } - /** @return The max memory limit in bytes */ + /** + * @return The max memory limit in bytes + */ public long getMaxMemoryLimitInBytes() { Object val = this.parameterMap.getOrDefault( @@ -266,7 +345,9 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return Return whether memory optimization for Parquet is enabled. */ + /** + * @return Return whether memory optimization for Parquet is enabled. + */ public boolean getEnableParquetInternalBuffering() { Object val = this.parameterMap.getOrDefault( @@ -274,7 +355,9 @@ public boolean getEnableParquetInternalBuffering() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** @return The max channel size in bytes */ + /** + * @return The max channel size in bytes + */ public long getMaxChannelSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -282,7 +365,9 @@ public long getMaxChannelSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return The max chunk size in bytes that could avoid OOM at server side */ + /** + * @return The max chunk size in bytes that could avoid OOM at server side + */ public long getMaxChunkSizeInBytes() { Object val = this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index dcf4037c6..5062275ad 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -5,13 +5,12 @@ import java.util.Properties; import net.snowflake.ingest.utils.ParameterProvider; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; public class ParameterProviderTest { - @Test - public void withValuesSet() { - Properties prop = new Properties(); + private Map getStartingParameterMap() { Map parameterMap = new HashMap<>(); parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); @@ -22,6 +21,14 @@ public void withValuesSet() { parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100); parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L); parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L); + return parameterMap; + } + + @Test + public void withValuesSet() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3L, parameterProvider.getBufferFlushIntervalInMs()); @@ -42,6 +49,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); @@ -60,6 +68,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); + props.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(null, props); Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); @@ -123,4 +132,104 @@ public void withDefaultValues() { ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterProvider.getMaxChannelSizeInBytes()); } + + @Test + public void testMaxClientLagEnabled() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + // call again to trigger caching logic + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledPluralTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + } + @Test + public void testMaxClientLagEnabledMinuteTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(60000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(120000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledDefaultValue() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMissingUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + } + + @Test + public void testMaxClientLagEnabledInvalidTimeUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Invalid time unit")); + } + } + + @Test + public void testMaxClientLagEnabledInvalidUnit() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } + } } From 6b21fabb6a6272424a07c590c7845c61b1e45541 Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 13 Sep 2023 14:55:15 -0700 Subject: [PATCH 2/5] Iteration 2: Google format --- .../ingest/utils/ParameterProvider.java | 48 +++++-------------- .../internal/ParameterProviderTest.java | 2 +- 2 files changed, 13 insertions(+), 37 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index c076d6d6d..0c33a84fc 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -166,9 +166,7 @@ private void setParameterMap(Map parameterOverrides, Properties MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); } - /** - * @return Longest interval in milliseconds between buffer flushes - */ + /** @return Longest interval in milliseconds between buffer flushes */ public long getBufferFlushIntervalInMs() { if (getMaxClientLagEnabled()) { if (cachedBufferFlushIntervalMs != -1L) { @@ -225,9 +223,7 @@ private boolean getMaxClientLagEnabled() { return (val instanceof Boolean) ? (Boolean) val : false; } - /** - * @return Time in milliseconds between checks to see if the buffer should be flushed - */ + /** @return Time in milliseconds between checks to see if the buffer should be flushed */ public long getBufferFlushCheckIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -238,9 +234,7 @@ public long getBufferFlushCheckIntervalInMs() { return (long) val; } - /** - * @return Duration in milliseconds to delay data insertion to the buffer when throttled - */ + /** @return Duration in milliseconds to delay data insertion to the buffer when throttled */ public long getInsertThrottleIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -251,9 +245,7 @@ public long getInsertThrottleIntervalInMs() { return (long) val; } - /** - * @return Percent of free total memory at which we throttle row inserts - */ + /** @return Percent of free total memory at which we throttle row inserts */ public int getInsertThrottleThresholdInPercentage() { Object val = this.parameterMap.getOrDefault( @@ -265,9 +257,7 @@ public int getInsertThrottleThresholdInPercentage() { return (int) val; } - /** - * @return Absolute size in bytes of free total memory at which we throttle row inserts - */ + /** @return Absolute size in bytes of free total memory at which we throttle row inserts */ public int getInsertThrottleThresholdInBytes() { Object val = this.parameterMap.getOrDefault( @@ -278,9 +268,7 @@ public int getInsertThrottleThresholdInBytes() { return (int) val; } - /** - * @return true if jmx metrics are enabled for a client - */ + /** @return true if jmx metrics are enabled for a client */ public boolean hasEnabledSnowpipeStreamingMetrics() { Object val = this.parameterMap.getOrDefault( @@ -291,9 +279,7 @@ public boolean hasEnabledSnowpipeStreamingMetrics() { return (boolean) val; } - /** - * @return Blob format version - */ + /** @return Blob format version */ public Constants.BdecVersion getBlobFormatVersion() { Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT); if (val instanceof Constants.BdecVersion) { @@ -322,9 +308,7 @@ public int getIOTimeCpuRatio() { return (int) val; } - /** - * @return the max retry count when waiting for a blob upload task to finish - */ + /** @return the max retry count when waiting for a blob upload task to finish */ public int getBlobUploadMaxRetryCount() { Object val = this.parameterMap.getOrDefault( @@ -335,9 +319,7 @@ public int getBlobUploadMaxRetryCount() { return (int) val; } - /** - * @return The max memory limit in bytes - */ + /** @return The max memory limit in bytes */ public long getMaxMemoryLimitInBytes() { Object val = this.parameterMap.getOrDefault( @@ -345,9 +327,7 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return Return whether memory optimization for Parquet is enabled. - */ + /** @return Return whether memory optimization for Parquet is enabled. */ public boolean getEnableParquetInternalBuffering() { Object val = this.parameterMap.getOrDefault( @@ -355,9 +335,7 @@ public boolean getEnableParquetInternalBuffering() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** - * @return The max channel size in bytes - */ + /** @return The max channel size in bytes */ public long getMaxChannelSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -365,9 +343,7 @@ public long getMaxChannelSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return The max chunk size in bytes that could avoid OOM at server side - */ + /** @return The max chunk size in bytes that could avoid OOM at server side */ public long getMaxChunkSizeInBytes() { Object val = this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 5062275ad..4e5ed12f2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -5,7 +5,6 @@ import java.util.Properties; import net.snowflake.ingest.utils.ParameterProvider; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class ParameterProviderTest { @@ -154,6 +153,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); } + @Test public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); From bd716ab63c28bb7343aa2cc9a2ebffe62e5df56e Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 13 Sep 2023 15:35:44 -0700 Subject: [PATCH 3/5] Iteration 3: Addresses sfc-gh-asen's comments --- .../snowflake/ingest/utils/ParameterProvider.java | 15 ++++++++------- .../streaming/internal/ParameterProviderTest.java | 14 ++++++++++++-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0c33a84fc..3b020d84e 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -188,18 +188,19 @@ public long getBufferFlushIntervalInMs() { private long getMaxClientLagMs() { Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); if (!(val instanceof String)) { - return 1000; + return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; } String maxLag = (String) val; String[] lagParts = maxLag.split(" "); if (lagParts.length != 2 || (lagParts[0] == null || "".equals(lagParts[0])) || (lagParts[1] == null || "".equals(lagParts[1]))) { - return 1000; + throw new IllegalArgumentException( + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag)); } - long unit; + long lag; try { - unit = Long.parseLong(lagParts[0]); + lag = Long.parseLong(lagParts[0]); } catch (Throwable t) { throw new IllegalArgumentException( String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); @@ -207,10 +208,10 @@ private long getMaxClientLagMs() { switch (lagParts[1].toLowerCase()) { case "second": case "seconds": - return unit * 1000; + return lag * 1000; case "minute": case "minutes": - return unit * 60000; + return lag * 60000; default: throw new IllegalArgumentException( String.format("Invalid time unit supplied = '%s", lagParts[1])); @@ -220,7 +221,7 @@ private long getMaxClientLagMs() { private boolean getMaxClientLagEnabled() { Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT); - return (val instanceof Boolean) ? (Boolean) val : false; + return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } /** @return Time in milliseconds between checks to see if the buffer should be flushed */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 4e5ed12f2..40ef1fa2d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -190,7 +190,12 @@ public void testMaxClientLagEnabledMissingUnit() { parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } } @Test @@ -200,7 +205,12 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); + } } @Test From c65d9de4d4a7ecc91ead8b84513472474532f7da Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 13 Sep 2023 16:34:45 -0700 Subject: [PATCH 4/5] Iteration 4: Sets min/max --- .../ingest/utils/ParameterProvider.java | 22 ++++++++++++-- .../internal/ParameterProviderTest.java | 30 +++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 3b020d84e..8fa43f161 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -53,6 +53,11 @@ public class ParameterProvider { // Lag related parameters public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; + + static final long MAX_CLIENT_LAG_MS_MIN = 1000; + + // 10 minutes + static final long MAX_CLIENT_LAG_MS_MAX = 600000; public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. @@ -92,6 +97,7 @@ private void updateValue( this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); } } + /** * Sets parameter values by first checking 1. parameterOverrides 2. props 3. default value * @@ -205,17 +211,29 @@ private long getMaxClientLagMs() { throw new IllegalArgumentException( String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); } + long computedLag = BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; switch (lagParts[1].toLowerCase()) { case "second": case "seconds": - return lag * 1000; + computedLag = lag * 1000; + break; case "minute": case "minutes": - return lag * 60000; + computedLag = lag * 60000; + break; default: throw new IllegalArgumentException( String.format("Invalid time unit supplied = '%s", lagParts[1])); } + + if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) { + throw new IllegalArgumentException( + String.format( + "Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum" + + " (milliseconds) = %s", + MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX)); + } + return computedLag; } private boolean getMaxClientLagEnabled() { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 40ef1fa2d..def5f7ecf 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -242,4 +242,34 @@ public void testMaxClientLagEnabledInvalidUnit() { Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); } } + + @Test + public void testMaxClientLagEnabledThresholdBelow() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } + + @Test + public void testMaxClientLagEnabledThresholdAbove() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBufferFlushIntervalInMs(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); + } + } } From 8f38047cb87aeeb0c7b56ea7cec3049ad9cb6261 Mon Sep 17 00:00:00 2001 From: Tyler Jones Date: Wed, 13 Sep 2023 18:54:51 -0700 Subject: [PATCH 5/5] Iteration 5: Jays comments --- .../snowflake/ingest/utils/ParameterProvider.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 8fa43f161..5c6f81f66 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** Utility class to provide configurable constants */ public class ParameterProvider { @@ -54,10 +55,9 @@ public class ParameterProvider { public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; - static final long MAX_CLIENT_LAG_MS_MIN = 1000; + static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); - // 10 minutes - static final long MAX_CLIENT_LAG_MS_MAX = 600000; + static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. @@ -211,15 +211,15 @@ private long getMaxClientLagMs() { throw new IllegalArgumentException( String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); } - long computedLag = BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT; + long computedLag; switch (lagParts[1].toLowerCase()) { case "second": case "seconds": - computedLag = lag * 1000; + computedLag = lag * TimeUnit.SECONDS.toMillis(1); break; case "minute": case "minutes": - computedLag = lag * 60000; + computedLag = lag * TimeUnit.SECONDS.toMillis(60); break; default: throw new IllegalArgumentException(