Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NO-SNOW: Deprecate BUFFER_FLUSH_INTERVAL_IN_MILLIS parameter #659

Merged
merged 4 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ public RequestBuilder(
SecurityManager securityManager,
CloseableHttpClient httpClient,
String clientName) {
this(accountName,
this(
accountName,
userName,
credential,
schemeName,
Expand Down Expand Up @@ -648,7 +649,7 @@ private static void addUserAgent(HttpUriRequest request, String userAgentSuffix)
public void addToken(HttpUriRequest request) {
request.setHeader(HttpHeaders.AUTHORIZATION, BEARER_PARAMETER + securityManager.getToken());
request.setHeader(SF_HEADER_AUTHORIZATION_TOKEN_TYPE, this.securityManager.getTokenType());
if(addAccountNameInRequest) {
if (addAccountNameInRequest) {
request.setHeader(SF_HEADER_ACCOUNT_NAME, accountName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {
return this;
}

public OpenChannelRequestBuilder setOffsetToken(String offsetToken){
public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
return this;
}

public OpenChannelRequest build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.util.Map;
import java.util.Properties;

import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestClientInternal;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SnowflakeURL;
Expand Down Expand Up @@ -71,10 +70,10 @@ public SnowflakeStreamingIngestClient build() {

if (addAccountNameInRequest) {
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
this.name, accountURL, prop, this.parameterOverrides, addAccountNameInRequest);
}
return new SnowflakeStreamingIngestClientInternal<>(
this.name, accountURL, prop, this.parameterOverrides);
this.name, accountURL, prop, this.parameterOverrides);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ CompletableFuture<Void> flush(boolean isForce) {
&& !isTestMode()
&& (this.isNeedFlush
|| timeDiffMillis
>= this.owningClient.getParameterProvider().getBufferFlushIntervalInMs()))) {
>= this.owningClient.getParameterProvider().getCachedMaxClientLagInMs()))) {

return this.statsFuture()
.thenCompose((v) -> this.distributeFlush(isForce, timeDiffMillis))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
prop.getProperty(Constants.OAUTH_CLIENT_SECRET),
prop.getProperty(Constants.OAUTH_REFRESH_TOKEN));
}
this.requestBuilder = new RequestBuilder(
this.requestBuilder =
new RequestBuilder(
accountURL,
prop.get(USER).toString(),
credential,
Expand Down Expand Up @@ -275,14 +276,7 @@ public SnowflakeStreamingIngestClientInternal(
Properties prop,
Map<String, Object> parameterOverrides,
boolean addAccountNameInRequest) {
this(name,
accountURL,
prop,
null,
false,
null,
parameterOverrides,
addAccountNameInRequest);
this(name, accountURL, prop, null, false, null, parameterOverrides, addAccountNameInRequest);
}

/**
Expand Down Expand Up @@ -353,7 +347,7 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
payload.put("schema", request.getSchemaName());
payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name());
payload.put("role", this.role);
if (request.isOffsetTokenProvided()){
if (request.isOffsetTokenProvided()) {
payload.put("offset_token", request.getOffsetToken());
}

Expand Down
120 changes: 54 additions & 66 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ public class ParameterProvider {

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final int INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT = 10;
Expand All @@ -57,12 +54,10 @@ public class ParameterProvider {
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;

// Lag related parameters
public static final String MAX_CLIENT_LAG_DEFAULT = "1 second";
public static final boolean MAX_CLIENT_LAG_ENABLED_DEFAULT = true;

public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second
static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1);

static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10);

public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100;

Expand Down Expand Up @@ -114,11 +109,15 @@ private void updateValue(
* @param props Properties file provided to client constructor
*/
private void setParameterMap(Map<String, Object> parameterOverrides, Properties props) {
this.updateValue(
BUFFER_FLUSH_INTERVAL_IN_MILLIS,
BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT,
parameterOverrides,
props);
// BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed
if ((parameterOverrides != null
&& parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS))
|| (props != null && props.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS))) {
throw new IllegalArgumentException(
String.format(
"%s is deprecated, please use %s instead",
BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG));
}

this.updateValue(
BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS,
Expand Down Expand Up @@ -177,8 +176,7 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props);

this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);

this.updateValue(
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
Expand All @@ -193,57 +191,52 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
}

/** @return Longest interval in milliseconds between buffer flushes */
public long getBufferFlushIntervalInMs() {
if (getMaxClientLagEnabled()) {
if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
}
long lag = getMaxClientLagMs();
if (cachedBufferFlushIntervalMs == -1L) {
cachedBufferFlushIntervalMs = lag;
}
public long getCachedMaxClientLagInMs() {
if (cachedBufferFlushIntervalMs != -1L) {
return cachedBufferFlushIntervalMs;
} else {
Object val =
this.parameterMap.getOrDefault(
BUFFER_FLUSH_INTERVAL_IN_MILLIS, BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

cachedBufferFlushIntervalMs = getMaxClientLagInMs();
return cachedBufferFlushIntervalMs;
}

private long getMaxClientLagMs() {
private long getMaxClientLagInMs() {
Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT);
if (!(val instanceof String)) {
return BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT;
}
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length != 2
|| (lagParts[0] == null || "".equals(lagParts[0]))
|| (lagParts[1] == null || "".equals(lagParts[1]))) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag));
}
long lag;
try {
lag = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}
long computedLag;
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
computedLag = lag * TimeUnit.SECONDS.toMillis(1);
break;
case "minute":
case "minutes":
computedLag = lag * TimeUnit.SECONDS.toMillis(60);
break;
default:
if (val instanceof String) {
String maxLag = (String) val;
String[] lagParts = maxLag.split(" ");
if (lagParts.length > 2) {
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", maxLag));
}

// Compute the actual value
try {
computedLag = Long.parseLong(lagParts[0]);
} catch (Throwable t) {
throw new IllegalArgumentException(
String.format("Failed to parse MAX_CLIENT_LAG = '%s'", lagParts[0]), t);
}

// Compute the time unit if needed
if (lagParts.length == 2) {
switch (lagParts[1].toLowerCase()) {
case "second":
case "seconds":
computedLag = computedLag * TimeUnit.SECONDS.toMillis(1);
break;
case "minute":
case "minutes":
computedLag = computedLag * TimeUnit.SECONDS.toMillis(60);
break;
default:
throw new IllegalArgumentException(
String.format("Invalid time unit supplied = '%s", lagParts[1]));
}
}
} else {
computedLag = (long) val;
}

if (!(computedLag >= MAX_CLIENT_LAG_MS_MIN && computedLag <= MAX_CLIENT_LAG_MS_MAX)) {
Expand All @@ -253,13 +246,8 @@ private long getMaxClientLagMs() {
+ " (milliseconds) = %s",
MAX_CLIENT_LAG_MS_MIN, MAX_CLIENT_LAG_MS_MAX));
}
return computedLag;
}

private boolean getMaxClientLagEnabled() {
Object val =
this.parameterMap.getOrDefault(MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
return computedLag;
}

/** @return Time in milliseconds between checks to see if the buffer should be flushed */
Expand Down Expand Up @@ -411,8 +399,8 @@ public int getMaxChunksInBlobAndRegistrationRequest() {
/** @return BDEC compression algorithm */
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
if (val instanceof Constants.BdecParquetCompression) {
return (Constants.BdecParquetCompression) val;
}
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/net/snowflake/ingest/SimpleIngestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ public void testSimpleIngestWithPattern() throws Exception {
}

private void getHistoryAndAssertLoad(SimpleIngestManager manager, String test_file_name_2)
throws InterruptedException,
java.util.concurrent.ExecutionException,
throws InterruptedException, java.util.concurrent.ExecutionException,
java.util.concurrent.TimeoutException {
// keeps track of whether we've loaded the file
boolean loaded = false;
Expand Down
Loading
Loading