Skip to content

Commit

Permalink
Iteration 4: Sets min/max
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tjones committed Sep 13, 2023
1 parent bd716ab commit c65d9de
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
22 changes: 20 additions & 2 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public class ParameterProvider {
// Lag related parameters
public static final String MAX_CLIENT_LAG_DEFAULT = "1 second";
public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true;

static final long MAX_CLIENT_LAG_MS_MIN = 1000;

// 10 minutes
static final long MAX_CLIENT_LAG_MS_MAX = 600000;
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
Expand Down Expand Up @@ -92,6 +97,7 @@ private void updateValue(
this.parameterMap.put(key, props.getOrDefault(key, defaultValue));
}
}

/**
* Sets parameter values by first checking 1. parameterOverrides 2. props 3. default value
*
Expand Down Expand Up @@ -205,17 +211,29 @@ private long getMaxClientLagMs() {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}
long computedLag = BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT;
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
return lag * 1000;
computedLag = lag * 1000;
break;
case "minute":
case "minutes":
return lag * 60000;
computedLag = lag * 60000;
break;
default:
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
}

if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) {
throw new IllegalArgumentException(
String.format(
"Lag falls outside of allowed time range. Minimum (milliseconds) = %s, Maximum"
+ " (milliseconds) = %s",
MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX));
}
return computedLag;
}

private boolean getMaxClientLagEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,34 @@ public void testMaxClientLagEnabledInvalidUnit() {
Assert.assertTrue(e.getMessage().startsWith("Failed to parse"));
}
}

@Test
public void testMaxClientLagEnabledThresholdBelow() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Lag falls outside"));
}
}

@Test
public void testMaxClientLagEnabledThresholdAbove() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG_ENABLED, true);
parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBufferFlushIntervalInMs();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().startsWith("Lag falls outside"));
}
}
}

0 comments on commit c65d9de

Please sign in to comment.