From 368b17de0ff53b85ed91739cad160dacc4010868 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 26 Nov 2024 15:24:34 -0800 Subject: [PATCH 1/2] wait after create index complete Signed-off-by: Sean Kao --- docs/index.md | 1 + .../opensearch/flint/core/FlintOptions.java | 9 +++++- .../core/storage/FlintOpenSearchClient.java | 9 ++++++ .../sql/flint/config/FlintSparkConf.scala | 8 +++++- .../flint/config/FlintSparkConfSuite.scala | 10 +++++++ .../core/FlintOpenSearchClientSuite.scala | 28 +++++++++++++++++++ 6 files changed, 63 insertions(+), 2 deletions(-) diff --git a/docs/index.md b/docs/index.md index 82c147de2..7ca058476 100644 --- a/docs/index.md +++ b/docs/index.md @@ -546,6 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '//' to isolate checkpoint data. - `spark.flint.index.checkpoint.mandatory`: default is true. - `spark.datasource.flint.socket_timeout_millis`: default value is 60000. +- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 0. - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15. - `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60. - `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 6ddc6ae9c..964ee1ac3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -88,7 +88,10 @@ public class FlintOptions implements Serializable { public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000; public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000; - + + public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis"; + public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0; + public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; public static final String BATCH_BYTES = "write.batch_bytes"; @@ -178,6 +181,10 @@ public int getSocketTimeoutMillis() { return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS))); } + public int getRequestCompletionDelayMillis() { + return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS))); + } + public String getDataSourceName() { return options.getOrDefault(DATA_SOURCE_NAME, ""); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 2bc097bba..5861ccf22 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -44,6 +44,7 @@ public void createIndex(String indexName, FlintMetadata metadata) { LOG.info("Creating Flint index " + indexName + " with metadata " + metadata); try { createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings()); + waitRequestComplete(); // Delay to ensure create is complete before making other requests for the index emitIndexCreationSuccessMetric(metadata.kind()); } catch (IllegalStateException ex) { emitIndexCreationFailureMetric(metadata.kind()); @@ -131,6 +132,14 @@ private String sanitizeIndexName(String indexName) { return OpenSearchClientUtils.sanitizeIndexName(indexName); } + private void waitRequestComplete() { + try { + Thread.sleep(options.getRequestCompletionDelayMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + private void emitIndexCreationSuccessMetric(String indexKind) { emitIndexCreationMetric(indexKind, "success"); } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index bdcc120c0..d5786e003 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -201,6 +201,11 @@ object FlintSparkConf { .datasourceOption() .doc("socket duration in milliseconds") .createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS)) + val REQUEST_COMPLETION_DELAY_MILLIS = + FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}") + .datasourceOption() + .doc("delay in milliseconds after index creation is completed") + .createWithDefault(String.valueOf(FlintOptions.DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS)) val DATA_SOURCE_NAME = FlintConfig(s"spark.flint.datasource.name") .doc("data source name") @@ -342,7 +347,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable SOCKET_TIMEOUT_MILLIS, JOB_TYPE, REPL_INACTIVITY_TIMEOUT_MILLIS, - BATCH_BYTES) + BATCH_BYTES, + REQUEST_COMPLETION_DELAY_MILLIS) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 0cde6ab0f..e0285cc18 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -114,6 +114,16 @@ class FlintSparkConfSuite extends FlintSuite { } } + test("test request completionDelayMillis default value") { + FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0 + } + + test("test specified request completionDelayMillis") { + val options = + FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions() + options.getRequestCompletionDelayMillis shouldBe 1000 + } + test("externalSchedulerIntervalThreshold should return default value when empty") { val options = FlintSparkConf(Map("spark.flint.job.externalScheduler.interval" -> "").asJava) assert(options diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala index a2c2d26f6..fe3cefef8 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala @@ -65,6 +65,27 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M (settings \ "index.number_of_replicas").extract[String] shouldBe "2" } + it should "create index with request completion delay config" in { + val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}") + // Create a dummy index to avoid timing the initial overhead + flintClient.createIndex("dummy", metadata) + + val indexName = "flint_test_without_request_completion_delay" + val elapsedTimeWithoutDelay = timer { + flintClient.createIndex(indexName, metadata) + } + + val delayIndexName = "flint_test_with_request_completion_delay" + val delayOptions = + openSearchOptions + (FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS -> "2000") + val delayFlintOptions = new FlintOptions(delayOptions.asJava) + val delayFlintClient = new FlintOpenSearchClient(delayFlintOptions) + val elapsedTimeWithDelay = timer { + delayFlintClient.createIndex(delayIndexName, metadata) + } + elapsedTimeWithDelay - elapsedTimeWithoutDelay should be >= 1800L // allowing 200ms of wiggle room + } + it should "get all index names with the given index name pattern" in { val metadata = FlintOpenSearchIndexMetadataService.deserialize( """{"properties": {"test": { "type": "integer" } } }""") @@ -220,4 +241,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M def createTable(indexName: String, options: FlintOptions): Table = { OpenSearchCluster.apply(indexName, options).asScala.head } + + def timer(block: => Unit): Long = { + val start = System.currentTimeMillis() + block + val end = System.currentTimeMillis() + end - start + } } From df706c2f789bcb02a4ffd0bcab8307fc472b132f Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 26 Nov 2024 17:39:58 -0800 Subject: [PATCH 2/2] add default wait time for aoss Signed-off-by: Sean Kao --- docs/index.md | 2 +- .../scala/org/opensearch/flint/core/FlintOptions.java | 6 +++++- .../apache/spark/sql/flint/config/FlintSparkConf.scala | 8 ++++---- .../spark/sql/flint/config/FlintSparkConfSuite.scala | 5 +++++ 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7ca058476..8e97001d5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -546,7 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '//' to isolate checkpoint data. - `spark.flint.index.checkpoint.mandatory`: default is true. - `spark.datasource.flint.socket_timeout_millis`: default value is 60000. -- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 0. +- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0. - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15. - `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60. - `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 964ee1ac3..f9d181b70 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -91,6 +91,7 @@ public class FlintOptions implements Serializable { public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis"; public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0; + public static final int DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS = 2000; public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; @@ -182,7 +183,10 @@ public int getSocketTimeoutMillis() { } public int getRequestCompletionDelayMillis() { - return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS))); + int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName()) + ? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS + : DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS; + return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(defaultValue))); } public String getDataSourceName() { diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index d5786e003..364a8a1de 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -205,7 +205,7 @@ object FlintSparkConf { FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}") .datasourceOption() .doc("delay in milliseconds after index creation is completed") - .createWithDefault(String.valueOf(FlintOptions.DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS)) + .createOptional() val DATA_SOURCE_NAME = FlintConfig(s"spark.flint.datasource.name") .doc("data source name") @@ -347,8 +347,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable SOCKET_TIMEOUT_MILLIS, JOB_TYPE, REPL_INACTIVITY_TIMEOUT_MILLIS, - BATCH_BYTES, - REQUEST_COMPLETION_DELAY_MILLIS) + BATCH_BYTES) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap @@ -362,7 +361,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable REQUEST_INDEX, METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER, EXCLUDE_JOB_IDS, - SCROLL_SIZE) + SCROLL_SIZE, + REQUEST_COMPLETION_DELAY_MILLIS) .map(conf => (conf.optionKey, conf.readFrom(reader))) .flatMap { case (_, None) => None diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index e0285cc18..594322bae 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -118,6 +118,11 @@ class FlintSparkConfSuite extends FlintSuite { FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0 } + test("test request completionDelayMillis default value for aoss") { + val options = FlintSparkConf(Map("auth.servicename" -> "aoss").asJava).flintOptions() + options.getRequestCompletionDelayMillis shouldBe 2000 + } + test("test specified request completionDelayMillis") { val options = FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions()