-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Blob number limit for reg req to avoid oversized registrations #569
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
import static net.snowflake.ingest.utils.Utils.getStackTrace; | ||
|
||
import com.codahale.metrics.Timer; | ||
import com.google.common.collect.Lists; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
@@ -195,8 +196,9 @@ List<FlushService.BlobData<T>> registerBlobs(Map<String, Timer.Context> latencyT | |
Timer.Context registerContext = | ||
Utils.createTimerContext(this.owningClient.registerLatency); | ||
|
||
int blobsPerReq = getBlobNumToRegisterInOneReq(blobs.size()); | ||
// Register the blobs, and invalidate any channels that return a failure status code | ||
this.owningClient.registerBlobs(blobs); | ||
Lists.partition(blobs, blobsPerReq).forEach(owningClient::registerBlobs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a test where the blobs request is splited? |
||
|
||
if (registerContext != null) { | ||
registerContext.stop(); | ||
|
@@ -216,6 +218,21 @@ List<FlushService.BlobData<T>> registerBlobs(Map<String, Timer.Context> latencyT | |
return errorBlobs; | ||
} | ||
|
||
/** | ||
* We split blobs into batches to avoid too big payload in registration requests in case of | ||
* connection hiccups if too many blobs are accumulated. | ||
*/ | ||
private int getBlobNumToRegisterInOneReq(int blobNum) { | ||
int blobsPerReq = owningClient.getParameterProvider().getMaxBlobsToRegisterInOneRequest(); | ||
if (blobNum > blobsPerReq) { | ||
logger.logWarn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add client name in the log as well? |
||
"Many blobs to register: {}, possibly bad connection to Snowflake or ingestion is" | ||
+ " too fast and needs more resources", | ||
blobNum); | ||
} | ||
return blobsPerReq; | ||
} | ||
|
||
/** | ||
* Get the blobsList, this is for TEST ONLY, no lock protection | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,9 @@ public class ParameterProvider { | |
public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = | ||
"MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); | ||
|
||
public static final String MAX_BLOBS_TO_REGISTER_IN_ONE_REQUEST = | ||
"MAX_BLOBS_TO_REGISTER_IN_ONE_REQUEST".toLowerCase(); | ||
|
||
// Default values | ||
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; | ||
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; | ||
|
@@ -51,6 +54,8 @@ public class ParameterProvider { | |
It reduces memory consumption compared to using Java Objects for buffering.*/ | ||
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; | ||
|
||
public static final int MAX_BLOBS_TO_REGISTER_IN_ONE_REQUEST_DEFAULT = 10; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 10 seems to low, should we start with something higher like 30? If we have the log in Snowhouse for tacking number of blobs in one request, we could pick the P99 as well. |
||
|
||
/** Map of parameter name to parameter value. This will be set by client/configure API Call. */ | ||
private final Map<String, Object> parameterMap = new HashMap<>(); | ||
|
||
|
@@ -296,6 +301,13 @@ public long getMaxAllowedRowSizeInBytes() { | |
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; | ||
} | ||
|
||
public int getMaxBlobsToRegisterInOneRequest() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add a unit test for this like all other parameters we added? |
||
Object val = | ||
this.parameterMap.getOrDefault( | ||
MAX_BLOBS_TO_REGISTER_IN_ONE_REQUEST, MAX_BLOBS_TO_REGISTER_IN_ONE_REQUEST_DEFAULT); | ||
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a IO networking request, would it make sense to parallelize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be generally in favour if it was a common case.
Here it should be more like an exception indicating that system is already overloaded (added a log),
Hence I tend to not complicate the system with even more concurrency for this.
Happy to discuss more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I take this back, we cannot do this because of ordering guarantees per channel.