forked from opensearch-project/opensearch-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adaptive rate limiting for OpenSearch bulk requests (opensearch-proje…
…ct#1011) * adaptive rate limiting - replace fail safe rate limiter for google guava's - move rate limiter from RestHighLevelClientWrapper to OpenSearchBulkRetryWrapper - add metrics for rate limit (now convert rate from double to int) - add spark conf for rate limit parameters - adjust rate limit based on retryable result percentage Signed-off-by: Sean Kao <[email protected]> * metrics and test cases WIP Signed-off-by: Sean Kao <[email protected]> * test case Signed-off-by: Sean Kao <[email protected]> * shaded jar and configurable failure threshold Signed-off-by: Sean Kao <[email protected]> * update default values; add doc Signed-off-by: Sean Kao <[email protected]> * rename OpenSearchBulkRetryWrapper (remove Retry) Signed-off-by: Sean Kao <[email protected]> * remove failure threshold Signed-off-by: Sean Kao <[email protected]> * update metric name suffix to comply with setting Signed-off-by: Sean Kao <[email protected]> * remove bulk failure percentage metric Signed-off-by: Sean Kao <[email protected]> * change rate from double to long Signed-off-by: Sean Kao <[email protected]> * fix spark conf name Signed-off-by: Sean Kao <[email protected]> * change default value Signed-off-by: Sean Kao <[email protected]> * address comments - swap parameter for test case asserts - remove excessive null check (create noop impl for rate limiter) Signed-off-by: Sean Kao <[email protected]> --------- Signed-off-by: Sean Kao <[email protected]>
- Loading branch information
1 parent
974d7d4
commit f054022
Showing
15 changed files
with
412 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
83 changes: 83 additions & 0 deletions
83
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterImpl.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import com.google.common.util.concurrent.RateLimiter; | ||
import java.util.logging.Logger; | ||
import org.opensearch.flint.core.FlintOptions; | ||
import org.opensearch.flint.core.metrics.MetricConstants; | ||
import org.opensearch.flint.core.metrics.MetricsUtil; | ||
|
||
public class BulkRequestRateLimiterImpl implements BulkRequestRateLimiter { | ||
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterImpl.class.getName()); | ||
private RateLimiter rateLimiter; | ||
|
||
private final long minRate; | ||
private final long maxRate; | ||
private final long increaseStep; | ||
private final double decreaseRatio; | ||
|
||
public BulkRequestRateLimiterImpl(FlintOptions flintOptions) { | ||
minRate = flintOptions.getBulkRequestMinRateLimitPerNode(); | ||
maxRate = flintOptions.getBulkRequestMaxRateLimitPerNode(); | ||
increaseStep = flintOptions.getBulkRequestRateLimitPerNodeIncreaseStep(); | ||
decreaseRatio = flintOptions.getBulkRequestRateLimitPerNodeDecreaseRatio(); | ||
|
||
LOG.info("Setting rate limit for bulk request to " + minRate + " documents/sec"); | ||
this.rateLimiter = RateLimiter.create(minRate); | ||
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, minRate); | ||
} | ||
|
||
// Wait so it won't exceed rate limit. Does nothing if rate limit is not set. | ||
@Override | ||
public void acquirePermit() { | ||
this.rateLimiter.acquire(); | ||
LOG.info("Acquired 1 permit"); | ||
} | ||
|
||
@Override | ||
public void acquirePermit(int permits) { | ||
this.rateLimiter.acquire(permits); | ||
LOG.info("Acquired " + permits + " permits"); | ||
} | ||
|
||
/** | ||
* Increase rate limit additively. | ||
*/ | ||
@Override | ||
public void increaseRate() { | ||
setRate(getRate() + increaseStep); | ||
} | ||
|
||
/** | ||
* Decrease rate limit multiplicatively. | ||
*/ | ||
@Override | ||
public void decreaseRate() { | ||
setRate((long) (getRate() * decreaseRatio)); | ||
} | ||
|
||
@Override | ||
public long getRate() { | ||
return (long) this.rateLimiter.getRate(); | ||
} | ||
|
||
/** | ||
* Set rate limit to the given value, clamped by minRate and maxRate. Non-positive maxRate means | ||
* there's no maximum rate restriction, and the rate can be set to any value greater than | ||
* minRate. | ||
*/ | ||
@Override | ||
public void setRate(long permitsPerSecond) { | ||
if (maxRate > 0) { | ||
permitsPerSecond = Math.min(permitsPerSecond, maxRate); | ||
} | ||
permitsPerSecond = Math.max(minRate, permitsPerSecond); | ||
LOG.info("Setting rate limit for bulk request to " + permitsPerSecond + " documents/sec"); | ||
this.rateLimiter.setRate(permitsPerSecond); | ||
MetricsUtil.addHistoricGauge(MetricConstants.OS_BULK_RATE_LIMIT_METRIC, permitsPerSecond); | ||
} | ||
} |
37 changes: 37 additions & 0 deletions
37
flint-core/src/main/scala/org/opensearch/flint/core/storage/BulkRequestRateLimiterNoop.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.storage; | ||
|
||
import java.util.logging.Logger; | ||
|
||
public class BulkRequestRateLimiterNoop implements BulkRequestRateLimiter { | ||
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiterNoop.class.getName()); | ||
|
||
public BulkRequestRateLimiterNoop() { | ||
LOG.info("Rate limit for bulk request was not set."); | ||
} | ||
|
||
@Override | ||
public void acquirePermit() {} | ||
|
||
@Override | ||
public void acquirePermit(int permits) {} | ||
|
||
@Override | ||
public void increaseRate() {} | ||
|
||
@Override | ||
public void decreaseRate() {} | ||
|
||
@Override | ||
public long getRate() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public void setRate(long permitsPerSecond) {} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.