-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rate limiter for bulk request #567
Conversation
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
a4aa985
to
8576224
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During Index/mv refreshing, can user adjust this value if the customer under pressure.
My concern is that this configuration may be helpful during the testing stage but not in the production environment. We should investigate methods to dynamically control the ingestion rate.
docs/index.md
Outdated
@@ -527,6 +527,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i | |||
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. | |||
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. | |||
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] | |||
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: default value is 0, which disables rate limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.1 means 1 request per 10 seconds? could u add more in doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't accept decimal value. We could support if we modify the rate limit period, but I think it would become complicated considering someone might specify a value like 1.23
. If we want to reduce the traffic less than 1 request/sec, we can reduce the size of batch instead. It would reduce the actual number of request.
|
||
private BulkRequestRateLimiterHolder() {} | ||
|
||
public synchronized static BulkRequestRateLimiter getBulkRequestRateLimiter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does Holder necessary? move to BulkRequestRateLimiter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written in the comment, it is needed to use shared single instance, and make BulkRequestRateLimiter testable. If we directly make BulkRequestRateLimiter singleton, we cannot test it with different parameters.
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options)); | ||
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> { | ||
try { | ||
rateLimiter.acquirePermit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Each bulk request contains multiple index request. if the throttle is on each index request, the bulk request limit may not help.
- How does OpenSearch customer configure this paramater?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Rate limit on bulk request would limit the overall number of index request.
- They can add it as an extra Spark parameter for now. We might want to add an attribute for Datasource so we can specify per datasource.
Let me take this as an item for long term solution. If we use token bucket solution with external state store(DynamoDB, etc.), we can adjust the limit dynamically. |
Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: Tomoyuki Morita <[email protected]>
Added |
Signed-off-by: Tomoyuki Morita <[email protected]> (cherry picked from commit 15ee355) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
(cherry picked from commit 15ee355) Signed-off-by: Tomoyuki Morita <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Tomoyuki Morita <[email protected]> (cherry picked from commit 15ee355) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
(cherry picked from commit 15ee355) Signed-off-by: Tomoyuki Morita <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Description
NONE
refresh policy is used).spark.datasource.flint.write.bulkRequestRateLimitPerNode
to specify the rate limit per node.Issues Resolved
List any issues this PR will resolve, e.g. Closes [...].
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.