From cbbaee3908f1d36d1d9506afa16997473d2f4df3 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 2 May 2024 16:21:42 -0700 Subject: [PATCH] address comments Signed-off-by: Peng Huo --- docs/index.md | 2 +- .../org/opensearch/flint/core/FlintOptions.java | 12 ++++++++++++ .../flint/core/storage/FlintOpenSearchClient.java | 6 ++++-- .../flint/core/storage/OpenSearchWriter.java | 5 +++-- .../spark/sql/flint/config/FlintSparkConf.scala | 9 +++++---- .../spark/sql/flint/config/FlintSparkConfSuite.scala | 10 ++++++---- 6 files changed, 31 insertions(+), 13 deletions(-) diff --git a/docs/index.md b/docs/index.md index 5225f509e..ace390f7a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -507,7 +507,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. - `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. -- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 4mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. +- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. - `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index a0fde6212..0cf643791 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -7,6 +7,8 @@ import java.io.Serializable; import java.util.Map; + +import org.apache.spark.network.util.ByteUnit; import org.opensearch.flint.core.http.FlintRetryOptions; /** @@ -84,6 +86,10 @@ public class FlintOptions implements Serializable { public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; + public static final String BATCH_BYTES = "write.batch_bytes"; + + public static final String DEFAULT_BATCH_BYTES = "1mb"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -150,4 +156,10 @@ public String getDataSourceName() { public String getSystemIndexName() { return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, ""); } + + public int getBatchBytes() { + // we did not expect this value could be large than 10mb = 10 * 1024 * 1024 + return (int) org.apache.spark.network.util.JavaUtils + .byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e71e3ded5..c1b884241 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) { } public FlintWriter createWriter(String indexName) { - LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); + LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " + + "batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes())); + return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), + options.getRefreshPolicy(), options.getBatchBytes()); } @Override diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index 4693ebc63..d0875d492 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -33,11 +33,12 @@ public class OpenSearchWriter extends FlintWriter { private IRestHighLevelClient client; - public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) { + public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy, + int bufferSizeInBytes) { this.client = client; this.indexName = indexName; this.refreshPolicy = refreshPolicy; - this.baos = new ByteArrayOutputStream(); + this.baos = new ByteArrayOutputStream(bufferSizeInBytes); } @Override public void write(char[] cbuf, int off, int len) { diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index fafac9240..7a370dd8d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -98,12 +98,12 @@ object FlintSparkConf { "documents will vary depending on the individual size of each document.") .createWithDefault(Integer.MAX_VALUE.toString) - val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.write.batch_bytes") + val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}") .datasourceOption() .doc( "The approximately amount of data in bytes written to Flint in a single batch request. " + - "The actual data write to OpenSearch may more than it. Default value is 4mb") - .createWithDefault("4mb") + s"The actual data write to OpenSearch may more than it. Default value is 1mb") + .createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES) val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy") .datasourceOption() @@ -248,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable PASSWORD, SOCKET_TIMEOUT_MILLIS, JOB_TYPE, - REPL_INACTIVITY_TIMEOUT_MILLIS) + REPL_INACTIVITY_TIMEOUT_MILLIS, + BATCH_BYTES) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 62e50f7df..d4b1d89e7 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -75,11 +75,13 @@ class FlintSparkConfSuite extends FlintSuite { } test("test batch bytes options") { - val defaultConf = FlintSparkConf(Map("write.batch_size" -> "10").asJava) - defaultConf.batchBytes() shouldBe 4 * 1024 * 1024 + val defaultConf = FlintSparkConf(Map[String, String]().asJava) + defaultConf.batchBytes() shouldBe 1024 * 1024 + defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024 - val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "1mb").asJava) - overrideConf.batchBytes() shouldBe 1024 * 1024 + val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava) + overrideConf.batchBytes() shouldBe 4 * 1024 * 1024 + overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024 } /**