Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1821246 Add parameter for blob upload timeout #916

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.BLOB_UPLOAD_TIMEOUT_IN_SEC;
import static net.snowflake.ingest.utils.Utils.getStackTrace;

import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -120,15 +119,20 @@ List<FlushService.BlobData<T>> registerBlobs(Map<String, Timer.Context> latencyT
"Start loop inner for uploading blobs, size={}, idx={}", oldList.size(), idx);
while (idx < oldList.size()
&& System.currentTimeMillis() - startTime
<= TimeUnit.SECONDS.toMillis(BLOB_UPLOAD_TIMEOUT_IN_SEC * 2)) {
<= TimeUnit.SECONDS.toMillis(
owningClient.getParameterProvider().getBlobUploadTimeOutInSec() * 2L)) {
Pair<FlushService.BlobData<T>, CompletableFuture<BlobMetadata>> futureBlob =
oldList.get(idx);
try {
logger.logDebug(
"Start waiting on uploading blob={}, idx={}", futureBlob.getKey().getPath(), idx);
// Wait for uploading to finish
BlobMetadata blob =
futureBlob.getValue().get(BLOB_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
futureBlob
.getValue()
.get(
owningClient.getParameterProvider().getBlobUploadTimeOutInSec(),
TimeUnit.SECONDS);
logger.logDebug(
"Finish waiting on uploading blob={}, idx={}",
futureBlob.getKey().getPath(),
Expand Down
1 change: 0 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class Constants {
10L; // Don't change, should match server side
public static final long RESPONSE_ERR_ENQUEUE_TABLE_CHUNK_QUEUE_FULL =
7L; // Don't change, should match server side
public static final int BLOB_UPLOAD_TIMEOUT_IN_SEC = 5;
public static final int INSERT_THROTTLE_MAX_RETRY_COUNT = 60;
public static final long MAX_BLOB_SIZE_IN_BYTES = 1024 * 1024 * 1024;
public static final int BLOB_TAG_SIZE_IN_BYTES = 4;
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class ParameterProvider {
public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase();
public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST =
"MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase();
public static final String BLOB_UPLOAD_TIMEOUT_IN_SEC =
"BLOB_UPLOAD_TIMEOUT_IN_SEC".toLowerCase();
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

Expand All @@ -62,6 +64,8 @@ public class ParameterProvider {
public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L;
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 256 * 1024 * 1024;
public static final int BLOB_UPLOAD_TIMEOUT_IN_SEC_DEFAULT = 5;
public static final int BLOB_UPLOAD_TIMEOUT_IN_SEC_ICEBERG_MODE_DEFAULT = 20;
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved

// Lag related parameters
public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second
Expand Down Expand Up @@ -95,6 +99,9 @@ public class ParameterProvider {
// Cached enableIcebergStreaming - avoid parsing each time for quick lookup
private Boolean cachedEnableIcebergStreaming = null;

// Cached blob upload timeout - avoid parsing each time for quick lookup
private int cachedBlobUploadTimeoutInSec = -1;

/**
* Constructor. Takes properties from profile file and properties from client constructor and
* resolves final parameter value
Expand Down Expand Up @@ -270,6 +277,15 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
props,
false /* enforceDefault */);

this.checkAndUpdate(
BLOB_UPLOAD_TIMEOUT_IN_SEC,
isEnableIcebergStreaming()
? BLOB_UPLOAD_TIMEOUT_IN_SEC_ICEBERG_MODE_DEFAULT
: BLOB_UPLOAD_TIMEOUT_IN_SEC_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);

if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -529,6 +545,22 @@ public boolean isEnableIcebergStreaming() {
return cachedEnableIcebergStreaming;
}

/** @return The timeout in seconds for waiting for a blob upload task to finish. */
public int getBlobUploadTimeOutInSec() {
if (cachedBlobUploadTimeoutInSec != -1) {
return cachedBlobUploadTimeoutInSec;
}
Object val =
this.parameterMap.getOrDefault(
BLOB_UPLOAD_TIMEOUT_IN_SEC,
isEnableIcebergStreaming()
? BLOB_UPLOAD_TIMEOUT_IN_SEC_ICEBERG_MODE_DEFAULT
: BLOB_UPLOAD_TIMEOUT_IN_SEC_DEFAULT);
cachedBlobUploadTimeoutInSec =
(val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
return cachedBlobUploadTimeoutInSec;
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public void withDefaultValues() {
Assert.assertEquals(
ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT,
parameterProvider.getMaxChunksInRegistrationRequest());
Assert.assertEquals(
enableIcebergStreaming
? ParameterProvider.BLOB_UPLOAD_TIMEOUT_IN_SEC_ICEBERG_MODE_DEFAULT
: ParameterProvider.BLOB_UPLOAD_TIMEOUT_IN_SEC_DEFAULT,
parameterProvider.getBlobUploadTimeOutInSec());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.BLOB_UPLOAD_TIMEOUT_IN_SEC;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -27,6 +25,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
public class RegisterServiceTest {
Expand Down Expand Up @@ -59,7 +58,17 @@ public void teardown() throws Exception {

@Test
public void testRegisterService() throws ExecutionException, InterruptedException {
RegisterService<StubChunkData> rs = new RegisterService<>(null, true);
SnowflakeStreamingIngestClientInternal<StubChunkData> client =
Mockito.mock(SnowflakeStreamingIngestClientInternal.class);
ParameterProvider parameterProvider = Mockito.mock(ParameterProvider.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why do we need to mock the parameter provider, can we not do away with this unnecessary mocking (not for this PR) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this as RegisterServiceTest is reading blob upload timeout from parameter provider.

Mockito.when(parameterProvider.getBlobUploadTimeOutInSec())
.thenReturn(
enableIcebergStreaming
? ParameterProvider.BLOB_UPLOAD_TIMEOUT_IN_SEC_ICEBERG_MODE_DEFAULT
: ParameterProvider.BLOB_UPLOAD_TIMEOUT_IN_SEC_DEFAULT);
Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider);

RegisterService<StubChunkData> rs = new RegisterService<>(client, true);

Pair<FlushService.BlobData<StubChunkData>, CompletableFuture<BlobMetadata>> blobFuture =
new Pair<>(
Expand Down Expand Up @@ -120,7 +129,9 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception {
future.thenRunAsync(
() -> {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(BLOB_UPLOAD_TIMEOUT_IN_SEC) + 5);
Thread.sleep(
TimeUnit.SECONDS.toMillis(client.getParameterProvider().getBlobUploadTimeOutInSec())
+ 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down
Loading