Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiter for bulk request #567

Merged
merged 9 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: default value is 0, which disables rate limit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.1 means 1 request per 10 seconds? could u add more in doc.

Copy link
Collaborator Author

@ykmr1224 ykmr1224 Aug 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't accept decimal value. We could support if we modify the rate limit period, but I think it would become complicated considering someone might specify a value like 1.23. If we want to reduce the traffic less than 1 request/sec, we can reduce the size of batch instead. It would reduce the actual number of request.

- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.client.transport.rest_client.RestClientTransport;

import java.io.IOException;
import org.opensearch.flint.core.storage.BulkRequestRateLimiter;

import static org.opensearch.flint.core.metrics.MetricConstants.OS_READ_OP_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.OS_WRITE_OP_METRIC_PREFIX;
Expand All @@ -47,6 +48,7 @@
*/
public class RestHighLevelClientWrapper implements IRestHighLevelClient {
private final RestHighLevelClient client;
private final BulkRequestRateLimiter rateLimiter;

private final static JacksonJsonpMapper JACKSON_MAPPER = new JacksonJsonpMapper();

Expand All @@ -55,13 +57,21 @@ public class RestHighLevelClientWrapper implements IRestHighLevelClient {
*
* @param client the RestHighLevelClient instance to wrap
*/
public RestHighLevelClientWrapper(RestHighLevelClient client) {
public RestHighLevelClientWrapper(RestHighLevelClient client, BulkRequestRateLimiter rateLimiter) {
this.client = client;
this.rateLimiter = rateLimiter;
}

@Override
public BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException {
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> client.bulk(bulkRequest, options));
return execute(OS_WRITE_OP_METRIC_PREFIX, () -> {
try {
rateLimiter.acquirePermit();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Each bulk request contains multiple index request. if the throttle is on each index request, the bulk request limit may not help.
  2. How does OpenSearch customer configure this paramater?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Rate limit on bulk request would limit the overall number of index request.
  2. They can add it as an extra Spark parameter for now. We might want to add an attribute for Datasource so we can specify per datasource.

return client.bulk(bulkRequest, options);
} catch (InterruptedException e) {
throw new RuntimeException("rateLimiter.acquirePermit was interrupted.", e);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_BATCH_BYTES = "1mb";

public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "spark.datasource.flint.customFlintMetadataLogServiceClass";
public static final String CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS = "customFlintMetadataLogServiceClass";

public static final String SUPPORT_SHARD = "read.support_shard";

public static final String DEFAULT_SUPPORT_SHARD = "true";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -197,4 +200,10 @@ public boolean supportShard() {
return options.getOrDefault(SUPPORT_SHARD, DEFAULT_SUPPORT_SHARD).equalsIgnoreCase(
DEFAULT_SUPPORT_SHARD);
}

public long getBulkRequestRateLimitPerNode() {
System.out.println("####### BULK_REQUEST_RATE_LIMIT_PER_NODE" + options.get(BULK_REQUEST_RATE_LIMIT_PER_NODE));
System.out.println("############### options: " + options);
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.opensearch.flint.core.storage;

import dev.failsafe.RateLimiter;
import java.time.Duration;
import java.util.logging.Logger;
import org.opensearch.flint.core.FlintOptions;

public class BulkRequestRateLimiter {
private static final Logger LOG = Logger.getLogger(BulkRequestRateLimiter.class.getName());
private RateLimiter<Void> rateLimiter;

public BulkRequestRateLimiter(FlintOptions flintOptions) {
long bulkRequestRateLimitPerNode = flintOptions.getBulkRequestRateLimitPerNode();
if (bulkRequestRateLimitPerNode > 0) {
LOG.info("Setting rate limit for bulk request to " + bulkRequestRateLimitPerNode + "/sec");
this.rateLimiter = RateLimiter.<Void>smoothBuilder(
flintOptions.getBulkRequestRateLimitPerNode(),
Duration.ofSeconds(1)).build();
} else {
LOG.info("Rate limit for bulk request was not set.");
}
}

// Wait so it won't exceed rate limit. Does nothing if rate limit is not set.
public void acquirePermit() throws InterruptedException {
if (rateLimiter != null) {
this.rateLimiter.acquirePermit();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.flint.core.storage;

import org.opensearch.flint.core.FlintOptions;

/**
* Hold shared instance of BulkRequestRateLimiter. This class is introduced to make
* BulkRequestRateLimiter testable and share single instance.
*/
public class BulkRequestRateLimiterHolder {

private static BulkRequestRateLimiter instance;

private BulkRequestRateLimiterHolder() {}

public synchronized static BulkRequestRateLimiter getBulkRequestRateLimiter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does Holder necessary? move to BulkRequestRateLimiter?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written in the comment, it is needed to use shared single instance, and make BulkRequestRateLimiter testable. If we directly make BulkRequestRateLimiter singleton, we cannot test it with different parameters.

FlintOptions flintOptions) {
if (instance == null) {
instance = new BulkRequestRateLimiter(flintOptions);
}
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class OpenSearchClientUtils {
/**
* Metadata log index name prefix
*/
public final static String META_LOG_NAME_PREFIX = ".query_execution_request";
public final static String META_LOG_NAME_PREFIX = "query_execution_request";

/**
* Used in IT.
Expand All @@ -58,7 +58,8 @@ public static RestHighLevelClient createRestHighLevelClient(FlintOptions options
}

public static IRestHighLevelClient createClient(FlintOptions options) {
return new RestHighLevelClientWrapper(createRestHighLevelClient(options));
return new RestHighLevelClientWrapper(createRestHighLevelClient(options),
BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(options));
}

private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClientBuilder, FlintOptions options) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.flint.core.storage;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.opensearch.flint.core.FlintOptions;

class BulkRequestRateLimiterHolderTest {
FlintOptions flintOptions = new FlintOptions(ImmutableMap.of());
@Test
public void getBulkRequestRateLimiter() {
BulkRequestRateLimiter instance0 = BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(flintOptions);
BulkRequestRateLimiter instance1 = BulkRequestRateLimiterHolder.getBulkRequestRateLimiter(flintOptions);

assertNotNull(instance0);
assertEquals(instance0, instance1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.opensearch.flint.core.storage;


import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.opensearch.flint.core.FlintOptions;

class BulkRequestRateLimiterTest {
FlintOptions flintOptionsWithRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "1"));
FlintOptions flintOptionsWithoutRateLimit = new FlintOptions(ImmutableMap.of(FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE, "0"));

@Test
void acquirePermitWithRateConfig() throws Exception {
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithRateLimit);

assertTrue(timer(() -> {
limiter.acquirePermit();
limiter.acquirePermit();
}) >= 1000);
}

@Test
void acquirePermitWithoutRateConfig() throws Exception {
BulkRequestRateLimiter limiter = new BulkRequestRateLimiter(flintOptionsWithoutRateLimit);

assertTrue(timer(() -> {
limiter.acquirePermit();
limiter.acquirePermit();
}) < 100);
}

private interface Procedure {
void run() throws Exception;
}

private long timer(Procedure procedure) throws Exception {
long start = System.currentTimeMillis();
procedure.run();
long end = System.currentTimeMillis();
return end - start;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ object FlintSparkConf {
.doc("max retries on failed HTTP request, 0 means retry is disabled, default is 3")
.createWithDefault(String.valueOf(FlintRetryOptions.DEFAULT_MAX_RETRIES))

val BULK_REQUEST_RATE_LIMIT_PER_NODE =
FlintConfig(s"spark.datasource.flint.${FlintOptions.BULK_REQUEST_RATE_LIMIT_PER_NODE}")
.datasourceOption()
.doc("Rate limit (requests/sec) for bulk request per worker node. Rate won't be limited by default")
.createWithDefault(FlintOptions.DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE)

val RETRYABLE_HTTP_STATUS_CODES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_STATUS_CODES}")
.datasourceOption()
Expand Down Expand Up @@ -187,7 +193,7 @@ object FlintSparkConf {
.doc("data source name")
.createOptional()
val CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS =
FlintConfig(FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS)
FlintConfig(s"spark.datasource.flint.${FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS}")
ykmr1224 marked this conversation as resolved.
Show resolved Hide resolved
.datasourceOption()
.doc("custom Flint metadata log service class")
.createOptional()
Expand Down Expand Up @@ -275,6 +281,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES,
BULK_REQUEST_RATE_LIMIT_PER_NODE,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
SERVICE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import java.util.Optional

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

Expand Down Expand Up @@ -63,6 +64,16 @@ class FlintSparkConfSuite extends FlintSuite {
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
}

test("test bulkRequestRateLimitPerNode default value") {
val options = FlintSparkConf().flintOptions()
options.getBulkRequestRateLimitPerNode shouldBe 0
}

test("test specified bulkRequestRateLimitPerNode") {
val options = FlintSparkConf(Map("bulkRequestRateLimitPerNode" -> "5").asJava).flintOptions()
options.getBulkRequestRateLimitPerNode shouldBe 5
}

test("test metadata access AWS credentials provider option") {
withSparkConf("spark.metadata.accessAWSCredentialsProvider") {
spark.conf.set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers {

test("should build metadata log service") {
val customOptions =
openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "org.opensearch.flint.core.TestMetadataLogService")
openSearchOptions + (FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS -> "org.opensearch.flint.core.TestMetadataLogService")
val customFlintOptions = new FlintOptions(customOptions.asJava)
val customFlintMetadataLogService =
FlintMetadataLogServiceBuilder.build(customFlintOptions, sparkConf)
customFlintMetadataLogService shouldBe a[TestMetadataLogService]
}

test("should fail to build metadata log service if class name doesn't exist") {
val options = openSearchOptions + (CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS.key -> "dummy")
val options =
openSearchOptions + (FlintOptions.CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS -> "dummy")
val flintOptions = new FlintOptions(options.asJava)
the[RuntimeException] thrownBy {
FlintMetadataLogServiceBuilder.build(flintOptions, sparkConf)
Expand Down
Loading