From c120ef02175fbd6a5f4191bf042b25d99084ed94 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Thu, 4 Jan 2024 00:27:24 +0000 Subject: [PATCH] update logic --- .../ingest/streaming/OpenChannelRequest.java | 4 +- .../streaming/internal/FlushService.java | 2 +- ...nowflakeStreamingIngestClientInternal.java | 2 +- .../ingest/utils/ParameterProvider.java | 172 +++++++----------- .../net/snowflake/ingest/SimpleIngestIT.java | 3 +- .../internal/ParameterProviderTest.java | 75 ++++---- .../SnowflakeStreamingIngestChannelTest.java | 17 +- .../SnowflakeStreamingIngestClientTest.java | 14 +- .../streaming/internal/StreamingIngestIT.java | 25 ++- 9 files changed, 127 insertions(+), 187 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index d30472b63..0cf16af2e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -89,10 +89,10 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { return this; } - public OpenChannelRequestBuilder setOffsetToken(String offsetToken){ + public OpenChannelRequestBuilder setOffsetToken(String offsetToken) { this.offsetToken = offsetToken; this.isOffsetTokenProvided = true; - return this; + return this; } public OpenChannelRequest build() { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index cb1ef7810..acac3c3ee 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -261,7 +261,7 @@ CompletableFuture flush(boolean isForce) { && !isTestMode() && (this.isNeedFlush || timeDiffMillis - >= this.owningClient.getParameterProvider().getBufferFlushIntervalInMs()))) { + >= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) { return this.statsFuture() .thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis)) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index a52149331..4d15642cf 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -304,7 +304,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest payload.put("schema", request.getSchemaName()); payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); payload.put("role", this.role); - if (request.isOffsetTokenProvided()){ + if (request.isOffsetTokenProvided()) { payload.put("offset_token", request.getOffsetToken()); } diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index c77039886..854387647 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -36,13 +36,10 @@ public class ParameterProvider { public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); - public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase(); - public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM = "BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase(); // Default values - public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final int INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT = 10; @@ -57,12 +54,10 @@ public class ParameterProvider { public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L; // Lag related parameters - public static final String MAX_CLIENT_LAG_DEFAULT = "1 second"; - public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true; - + public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); - static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); + public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; @@ -114,12 +109,6 @@ private void updateValue( * @param props Properties file provided to client constructor */ private void setParameterMap(Map parameterOverrides, Properties props) { - this.updateValue( - BUFFER_FLUSH_INTERVAL_IN_MILLIS, - BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT, - parameterOverrides, - props); - this.updateValue( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, @@ -177,8 +166,7 @@ private void setParameterMap(Map parameterOverrides, Properties MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); - this.updateValue( - MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); + this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, @@ -192,81 +180,75 @@ private void setParameterMap(Map parameterOverrides, Properties props); } - /** - * @return Longest interval in milliseconds between buffer flushes - */ - public long getBufferFlushIntervalInMs() { - if (getMaxClientLagEnabled()) { - if (cachedBufferFlushIntervalMs != -1L) { - return cachedBufferFlushIntervalMs; - } - long lag = getMaxClientLagMs(); - if (cachedBufferFlushIntervalMs == -1L) { - cachedBufferFlushIntervalMs = lag; - } + /** @return Longest interval in milliseconds between buffer flushes */ + public long getCachedMaxClientLagInMs() { + // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed + if (this.parameterMap.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) { + throw new IllegalArgumentException( + String.format( + "%s is deprecated, please use %s instead", + BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); + } + + if (cachedBufferFlushIntervalMs != -1L) { return cachedBufferFlushIntervalMs; } - Object val = - this.parameterMap.getOrDefault( - BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT); - return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; + cachedBufferFlushIntervalMs = getMaxClientLagInMs(); + return cachedBufferFlushIntervalMs; } - private long getMaxClientLagMs() { + private long getMaxClientLagInMs() { Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); - if (!(val instanceof String)) { - return getBufferFlushIntervalInMs(); - } - String maxLag = (String) val; - String[] lagParts = maxLag.split(" "); - if (lagParts.length != 2 - || (lagParts[0] == null || "".equals(lagParts[0])) - || (lagParts[1] == null || "".equals(lagParts[1]))) { - throw new IllegalArgumentException( - String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag)); - } - long lag; - try { - lag = Long.parseLong(lagParts[0]); - } catch (Throwable t) { - throw new IllegalArgumentException( - String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); - } long computedLag; - switch (lagParts[1].toLowerCase()) { - case "second": - case "seconds": - computedLag = lag * TimeUnit.SECONDS.toMillis(1); - break; - case "minute": - case "minutes": - computedLag = lag * TimeUnit.SECONDS.toMillis(60); - break; - default: + if (val instanceof String) { + String maxLag = (String) val; + String[] lagParts = maxLag.split(" "); + if (lagParts.length > 2) { + throw new IllegalArgumentException( + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag)); + } + + // Compute the actual value + try { + computedLag = Long.parseLong(lagParts[0]); + } catch (Throwable t) { throw new IllegalArgumentException( - String.format("Invalid time unit supplied = '%s", lagParts[1])); + String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t); + } + + // Compute the time unit if needed + if (lagParts.length == 2) { + switch (lagParts[1].toLowerCase()) { + case "second": + case "seconds": + computedLag = computedLag * TimeUnit.SECONDS.toMillis(1); + break; + case "minute": + case "minutes": + computedLag = computedLag * TimeUnit.SECONDS.toMillis(60); + break; + default: + throw new IllegalArgumentException( + String.format("Invalid time unit supplied = '%s", lagParts[1])); + } + } + } else { + computedLag = (long) val; } if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) { throw new IllegalArgumentException( String.format( - "Lag falls outside of allowed time range. Minimum (seconds) = %s, Maximum" - + " (seconds) = %s", - MAX_CLIENT_LAG_MS_MIN / 1000, MAX_CLIENT_LAG_MS_MAX / 1000)); + "Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum" + + " (milliseconds) = %s", + MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX)); } - return computedLag; - } - private boolean getMaxClientLagEnabled() { - Object val = - this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; + return computedLag; } - /** - * @return Time in milliseconds between checks to see if the buffer should be flushed - */ + /** @return Time in milliseconds between checks to see if the buffer should be flushed */ public long getBufferFlushCheckIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -277,9 +259,7 @@ public long getBufferFlushCheckIntervalInMs() { return (long) val; } - /** - * @return Duration in milliseconds to delay data insertion to the buffer when throttled - */ + /** @return Duration in milliseconds to delay data insertion to the buffer when throttled */ public long getInsertThrottleIntervalInMs() { Object val = this.parameterMap.getOrDefault( @@ -290,9 +270,7 @@ public long getInsertThrottleIntervalInMs() { return (long) val; } - /** - * @return Percent of free total memory at which we throttle row inserts - */ + /** @return Percent of free total memory at which we throttle row inserts */ public int getInsertThrottleThresholdInPercentage() { Object val = this.parameterMap.getOrDefault( @@ -304,9 +282,7 @@ public int getInsertThrottleThresholdInPercentage() { return (int) val; } - /** - * @return Absolute size in bytes of free total memory at which we throttle row inserts - */ + /** @return Absolute size in bytes of free total memory at which we throttle row inserts */ public int getInsertThrottleThresholdInBytes() { Object val = this.parameterMap.getOrDefault( @@ -317,9 +293,7 @@ public int getInsertThrottleThresholdInBytes() { return (int) val; } - /** - * @return true if jmx metrics are enabled for a client - */ + /** @return true if jmx metrics are enabled for a client */ public boolean hasEnabledSnowpipeStreamingMetrics() { Object val = this.parameterMap.getOrDefault( @@ -330,9 +304,7 @@ public boolean hasEnabledSnowpipeStreamingMetrics() { return (boolean) val; } - /** - * @return Blob format version - */ + /** @return Blob format version */ public Constants.BdecVersion getBlobFormatVersion() { Object val = this.parameterMap.getOrDefault(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT); if (val instanceof Constants.BdecVersion) { @@ -361,9 +333,7 @@ public int getIOTimeCpuRatio() { return (int) val; } - /** - * @return the max retry count when waiting for a blob upload task to finish - */ + /** @return the max retry count when waiting for a blob upload task to finish */ public int getBlobUploadMaxRetryCount() { Object val = this.parameterMap.getOrDefault( @@ -374,9 +344,7 @@ public int getBlobUploadMaxRetryCount() { return (int) val; } - /** - * @return The max memory limit in bytes - */ + /** @return The max memory limit in bytes */ public long getMaxMemoryLimitInBytes() { Object val = this.parameterMap.getOrDefault( @@ -384,9 +352,7 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return Return whether memory optimization for Parquet is enabled. - */ + /** @return Return whether memory optimization for Parquet is enabled. */ public boolean getEnableParquetInternalBuffering() { Object val = this.parameterMap.getOrDefault( @@ -394,9 +360,7 @@ public boolean getEnableParquetInternalBuffering() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } - /** - * @return The max channel size in bytes - */ + /** @return The max channel size in bytes */ public long getMaxChannelSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -404,9 +368,7 @@ public long getMaxChannelSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return The max chunk size in bytes that could avoid OOM at server side - */ + /** @return The max chunk size in bytes that could avoid OOM at server side */ public long getMaxChunkSizeInBytes() { Object val = this.parameterMap.getOrDefault(MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT); @@ -432,9 +394,7 @@ public int getMaxChunksInBlobAndRegistrationRequest() { return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } - /** - * @return BDEC compression algorithm - */ + /** @return BDEC compression algorithm */ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { Object val = this.parameterMap.getOrDefault( diff --git a/src/test/java/net/snowflake/ingest/SimpleIngestIT.java b/src/test/java/net/snowflake/ingest/SimpleIngestIT.java index ee73a2046..20a27eafb 100644 --- a/src/test/java/net/snowflake/ingest/SimpleIngestIT.java +++ b/src/test/java/net/snowflake/ingest/SimpleIngestIT.java @@ -178,8 +178,7 @@ public void testSimpleIngestWithPattern() throws Exception { } private void getHistoryAndAssertLoad(SimpleIngestManager manager, String test_file_name_2) - throws InterruptedException, - java.util.concurrent.ExecutionException, + throws InterruptedException, java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException { // keeps track of whether we've loaded the file boolean loaded = false; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 8499fc985..e455741c8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -14,7 +14,7 @@ public class ParameterProviderTest { private Map getStartingParameterMap() { Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); @@ -31,10 +31,9 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(3L, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); Assert.assertEquals(6, parameterProvider.getInsertThrottleThresholdInPercentage()); Assert.assertEquals(1024, parameterProvider.getInsertThrottleThresholdInBytes()); @@ -51,14 +50,13 @@ public void withValuesSet() { @Test public void withNullProps() { Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); - Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); Assert.assertEquals(6, parameterProvider.getInsertThrottleThresholdInPercentage()); Assert.assertEquals(1024, parameterProvider.getInsertThrottleThresholdInBytes()); @@ -70,14 +68,13 @@ public void withNullProps() { @Test public void withNullParameterMap() { Properties props = new Properties(); - props.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 3L); + props.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - props.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, false); ParameterProvider parameterProvider = new ParameterProvider(null, props); - Assert.assertEquals(3, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); Assert.assertEquals(6, parameterProvider.getInsertThrottleThresholdInPercentage()); Assert.assertEquals(1024, parameterProvider.getInsertThrottleThresholdInBytes()); @@ -91,8 +88,7 @@ public void withNullInputs() { ParameterProvider parameterProvider = new ParameterProvider(null, null); Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushIntervalInMs()); + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -112,8 +108,7 @@ public void withDefaultValues() { ParameterProvider parameterProvider = new ParameterProvider(); Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushIntervalInMs()); + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -146,77 +141,75 @@ public void withDefaultValues() { public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic - Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @Test public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(2000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @Test public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(60000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @Test public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(120000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @Test public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(1000, parameterProvider.getBufferFlushIntervalInMs()); + Assert.assertEquals( + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @Test - public void testMaxClientLagEnabledMissingUnit() { + public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1"); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - try { - parameterProvider.getBufferFlushIntervalInMs(); - Assert.fail("Should not have succeeded"); - } catch (IllegalArgumentException e) { - Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); - } + Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); + } + + @Test + public void testMaxClientLagEnabledLongInput() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @Test public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { - parameterProvider.getBufferFlushIntervalInMs(); + parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); @@ -227,11 +220,10 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { - parameterProvider.getBufferFlushIntervalInMs(); + parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().startsWith("Invalid time unit")); @@ -242,11 +234,10 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { - parameterProvider.getBufferFlushIntervalInMs(); + parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().startsWith("Failed to parse")); @@ -257,11 +248,10 @@ public void testMaxClientLagEnabledInvalidUnit() { public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { - parameterProvider.getBufferFlushIntervalInMs(); + parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); @@ -272,11 +262,10 @@ public void testMaxClientLagEnabledThresholdBelow() { public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { - parameterProvider.getBufferFlushIntervalInMs(); + parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 037028086..528f9e46d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -266,20 +266,19 @@ public void testOpenChannelRequestCreationSuccess() { Assert.assertFalse(request.isOffsetTokenProvided()); } - @Test public void testOpenChannelRequesCreationtWithOffsetToken() { OpenChannelRequest request = - OpenChannelRequest.builder("CHANNEL") - .setDBName("STREAMINGINGEST_TEST") - .setSchemaName("PUBLIC") - .setTableName("T_STREAMINGINGEST") - .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .setOffsetToken("TEST_TOKEN") - .build(); + OpenChannelRequest.builder("CHANNEL") + .setDBName("STREAMINGINGEST_TEST") + .setSchemaName("PUBLIC") + .setTableName("T_STREAMINGINGEST") + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_TOKEN") + .build(); Assert.assertEquals( - "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); Assert.assertEquals("TEST_TOKEN", request.getOffsetToken()); Assert.assertTrue(request.isOffsetTokenProvided()); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 11fc0b93b..cb75b3a52 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -142,17 +142,16 @@ public void setup() { } @Test - @Ignore // Until able to test in PROD public void testConstructorParameters() throws Exception { Properties prop = new Properties(); prop.put(USER, TestUtils.getUser()); prop.put(ACCOUNT_URL, TestUtils.getHost()); prop.put(PRIVATE_KEY, TestUtils.getPrivateKey()); - prop.put(ROLE, "role"); - prop.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 123); + prop.put(ROLE, TestUtils.getRole()); + prop.put(ParameterProvider.MAX_CLIENT_LAG, 1234); Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 321); + parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 321L); SnowflakeStreamingIngestClientInternal client = (SnowflakeStreamingIngestClientInternal) @@ -162,7 +161,7 @@ public void testConstructorParameters() throws Exception { .build(); Assert.assertEquals("client", client.getName()); - Assert.assertEquals(123, client.getParameterProvider().getBufferFlushIntervalInMs()); + Assert.assertEquals(1234, client.getParameterProvider().getCachedMaxClientLagInMs()); Assert.assertEquals(321, client.getParameterProvider().getBufferFlushCheckIntervalInMs()); Assert.assertEquals( ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, @@ -256,15 +255,12 @@ public void testClientFactoryInvalidPrivateKey() throws Exception { } @Test - @Ignore // Wait for the client/configure endpoint to be available in PROD, can't mock the - // HttpUtil.executeGeneralRequest call because it's also used when setting up the - // connection public void testClientFactorySuccess() throws Exception { Properties prop = new Properties(); prop.put(USER, TestUtils.getUser()); prop.put(ACCOUNT_URL, TestUtils.getHost()); prop.put(PRIVATE_KEY, TestUtils.getPrivateKey()); - prop.put(ROLE, "role"); + prop.put(ROLE, TestUtils.getRole()); SnowflakeStreamingIngestClient client = SnowflakeStreamingIngestClientFactory.builder("client").setProperties(prop).build(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index eadbe81ee..3609b8e5e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -192,7 +192,7 @@ public void testSimpleIngest() throws Exception { @Test public void testParameterOverrides() throws Exception { Map parameterMap = new HashMap<>(); - parameterMap.put(ParameterProvider.BUFFER_FLUSH_INTERVAL_IN_MILLIS, 30L); + parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3 sec"); parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 50L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 1); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); @@ -536,19 +536,16 @@ public void testMultiColumnIngest() throws Exception { public void testOpenChannelOffsetToken() throws Exception { String tableName = "offsetTokenTest"; jdbcConnection - .createStatement() - .execute( - String.format( - "create or replace table %s (s text);", - tableName)); + .createStatement() + .execute(String.format("create or replace table %s (s text);", tableName)); OpenChannelRequest request1 = - OpenChannelRequest.builder("TEST_CHANNEL") - .setDBName(testDb) - .setSchemaName(TEST_SCHEMA) - .setTableName(tableName) - .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) - .setOffsetToken("TEST_OFFSET") - .build(); + OpenChannelRequest.builder("TEST_CHANNEL") + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_OFFSET") + .build(); // Open a streaming ingest channel from the given client SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); @@ -558,7 +555,7 @@ public void testOpenChannelOffsetToken() throws Exception { for (int i = 1; i < 15; i++) { if (channel1.getLatestCommittedOffsetToken() != null - && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { + && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { return; } else { Thread.sleep(2000);