diff --git a/docs/index.md b/docs/index.md index b1bf5478d..ace390f7a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -506,9 +506,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.customAWSCredentialsProvider`: default is empty. - `spark.datasource.flint.write.id_name`: no default value. - `spark.datasource.flint.ignore.id_column` : default value is true. -- `spark.datasource.flint.write.batch_size`: default value is 1000. -- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE - (false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] +- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE. +- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero. +- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)] - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 9858ffd1e..0cf643791 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -7,6 +7,8 @@ import java.io.Serializable; import java.util.Map; + +import org.apache.spark.network.util.ByteUnit; import org.opensearch.flint.core.http.FlintRetryOptions; /** @@ -74,7 +76,7 @@ public class FlintOptions implements Serializable { * * WAIT_UNTIL("wait_for") */ - public static final String DEFAULT_REFRESH_POLICY = "wait_for"; + public static final String DEFAULT_REFRESH_POLICY = "false"; public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis"; @@ -84,6 +86,10 @@ public class FlintOptions implements Serializable { public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; + public static final String BATCH_BYTES = "write.batch_bytes"; + + public static final String DEFAULT_BATCH_BYTES = "1mb"; + public FlintOptions(Map options) { this.options = options; this.retryOptions = new FlintRetryOptions(options); @@ -150,4 +156,10 @@ public String getDataSourceName() { public String getSystemIndexName() { return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, ""); } + + public int getBatchBytes() { + // we did not expect this value could be large than 10mb = 10 * 1024 * 1024 + return (int) org.apache.spark.network.util.JavaUtils + .byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index e71e3ded5..c1b884241 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) { } public FlintWriter createWriter(String indexName) { - LOG.info("Creating Flint index writer for " + indexName); - return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy()); + LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " + + "batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes())); + return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), + options.getRefreshPolicy(), options.getBatchBytes()); } @Override diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintWriter.java index 84edd5f60..4bddcbad9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintWriter.java @@ -19,4 +19,9 @@ public abstract class FlintWriter extends Writer { * { "title": "Prisoners", "year": 2013 } */ public static final String ACTION_CREATE = "create"; + + /** + * @return current data written into buffer in bytes. + */ + public abstract long getBufferSize(); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java index c1a4d9dc7..d0875d492 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchWriter.java @@ -14,7 +14,9 @@ import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.rest.RestStatus; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; /** @@ -27,19 +29,21 @@ public class OpenSearchWriter extends FlintWriter { private final String refreshPolicy; - private StringBuilder sb; + private final ByteArrayOutputStream baos; private IRestHighLevelClient client; - public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) { + public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy, + int bufferSizeInBytes) { this.client = client; this.indexName = indexName; - this.sb = new StringBuilder(); this.refreshPolicy = refreshPolicy; + this.baos = new ByteArrayOutputStream(bufferSizeInBytes); } @Override public void write(char[] cbuf, int off, int len) { - sb.append(cbuf, off, len); + byte[] bytes = new String(cbuf, off, len).getBytes(StandardCharsets.UTF_8); + baos.write(bytes, 0, bytes.length); } /** @@ -48,8 +52,8 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re */ @Override public void flush() { try { - if (sb.length() > 0) { - byte[] bytes = sb.toString().getBytes(); + if (baos.size() > 0) { + byte[] bytes = baos.toByteArray(); BulkResponse response = client.bulk( @@ -63,7 +67,7 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re } catch (IOException e) { throw new RuntimeException(String.format("Failed to execute bulk request on index: %s", indexName), e); } finally { - sb.setLength(0); + baos.reset(); } } @@ -78,6 +82,10 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re } } + public long getBufferSize() { + return baos.size(); + } + private boolean isCreateConflict(BulkItemResponse itemResp) { return itemResp.getOpType() == DocWriteRequest.OpType.CREATE && (itemResp.getFailure() == null || itemResp.getFailure() .getStatus() == RestStatus.CONFLICT); diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionWriter.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionWriter.scala index 57db4d1e4..c3bd4c745 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionWriter.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/FlintPartitionWriter.scala @@ -60,7 +60,7 @@ case class FlintPartitionWriter( gen.writeLineEnding() docCount += 1 - if (docCount >= options.batchSize()) { + if (docCount >= options.batchSize() || gen.getBufferSize >= options.batchBytes()) { gen.flush() docCount = 0 } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 9a8623e35..7a370dd8d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -14,6 +14,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.http.FlintRetryOptions import org.apache.spark.internal.config.ConfigReader +import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.flint.config.FlintSparkConf._ import org.apache.spark.sql.internal.SQLConf @@ -95,7 +96,14 @@ object FlintSparkConf { "The number of documents written to Flint in a single batch request is determined by the " + "overall size of the HTTP request, which should not exceed 100MB. The actual number of " + "documents will vary depending on the individual size of each document.") - .createWithDefault("1000") + .createWithDefault(Integer.MAX_VALUE.toString) + + val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}") + .datasourceOption() + .doc( + "The approximately amount of data in bytes written to Flint in a single batch request. " + + s"The actual data write to OpenSearch may more than it. Default value is 1mb") + .createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES) val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy") .datasourceOption() @@ -194,6 +202,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def batchSize(): Int = BATCH_SIZE.readFrom(reader).toInt + def batchBytes(): Long = org.apache.spark.network.util.JavaUtils + .byteStringAs(BATCH_BYTES.readFrom(reader), ByteUnit.BYTE) + def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader) def ignoreIdColumn(): Boolean = IGNORE_DOC_ID_COLUMN.readFrom(reader).toBoolean @@ -237,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable PASSWORD, SOCKET_TIMEOUT_MILLIS, JOB_TYPE, - REPL_INACTIVITY_TIMEOUT_MILLIS) + REPL_INACTIVITY_TIMEOUT_MILLIS, + BATCH_BYTES) .map(conf => (conf.optionKey, conf.readFrom(reader))) .toMap diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala index f9e0dc2c9..f99015735 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/json/FlintJacksonGenerator.scala @@ -5,10 +5,9 @@ package org.apache.spark.sql.flint.json -import java.io.Writer - import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.core.util.DefaultPrettyPrinter +import org.opensearch.flint.core.storage.FlintWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters @@ -24,7 +23,7 @@ import org.apache.spark.sql.types._ */ case class FlintJacksonGenerator( dataType: DataType, - writer: Writer, + writer: FlintWriter, options: JSONOptions, ignoredFieldName: Option[String] = None) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate @@ -314,4 +313,8 @@ case class FlintJacksonGenerator( }) }) } + + def getBufferSize: Long = { + writer.getBufferSize; + } } diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 3d643dde3..d4b1d89e7 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -9,7 +9,6 @@ import java.util.Optional import scala.collection.JavaConverters._ -import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.http.FlintRetryOptions._ import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -27,7 +26,7 @@ class FlintSparkConfSuite extends FlintSuite { // default value assert(flintOptions.getPort == 9200) - assert(flintOptions.getRefreshPolicy == "wait_for") + assert(flintOptions.getRefreshPolicy == "false") } } @@ -75,6 +74,16 @@ class FlintSparkConfSuite extends FlintSuite { } } + test("test batch bytes options") { + val defaultConf = FlintSparkConf(Map[String, String]().asJava) + defaultConf.batchBytes() shouldBe 1024 * 1024 + defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024 + + val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava) + overrideConf.batchBytes() shouldBe 4 * 1024 * 1024 + overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024 + } + /** * Delete index `indexNames` after calling `f`. */ diff --git a/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala b/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala index 3fbe96d5c..0bccf787b 100644 --- a/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/FlintDataSourceV2ITSuite.scala @@ -216,7 +216,7 @@ class FlintDataSourceV2ITSuite df.coalesce(1) .write .format("flint") - .options(options + ("spark.flint.write.batch.size" -> s"$batchSize")) + .options(options + ("spark.flint.write.batch_size" -> s"$batchSize")) .mode("overwrite") .save(indexName) @@ -499,6 +499,39 @@ class FlintDataSourceV2ITSuite } } + test("write dataframe to flint with batch bytes configuration") { + val indexName = "tbatchbytes" + val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt") + Seq("0b", "10b", "1mb").foreach(batchBytes => { + withIndexName(indexName) { + val mappings = + """{ + | "properties": { + | "aInt": { + | "type": "integer" + | } + | } + |}""".stripMargin + index(indexName, oneNodeSetting, mappings, Seq.empty) + + val df = spark.range(15).toDF("aInt") + df.coalesce(1) + .write + .format("flint") + .options(options + ("spark.flint.write.batch_bytes" -> s"$batchBytes")) + .mode("overwrite") + .save(indexName) + + checkAnswer( + spark.sqlContext.read + .format("flint") + .options(openSearchOptions) + .load(indexName), + df) + } + }) + } + /** * Copy from SPARK JDBCV2Suite. */ diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 6ee7cc68e..f582f9f45 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -60,6 +60,8 @@ object FlintJob extends Logging with FlintJobExecutor { * Without this setup, Spark would not recognize names in the format `my_glue1.default`. */ conf.set("spark.sql.defaultCatalog", dataSource) + configDYNMaxExecutors(conf, jobType) + val streamingRunningCount = new AtomicInteger(0) val jobOperator = JobOperator( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index c1d2bf79b..1e5df21e1 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -17,6 +17,7 @@ import play.api.libs.json._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY import org.apache.spark.sql.types._ import org.apache.spark.sql.util._ @@ -85,6 +86,19 @@ trait FlintJobExecutor { "org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions") } + /* + * Override dynamicAllocation.maxExecutors with streaming maxExecutors. more detail at + * https://github.com/opensearch-project/opensearch-spark/issues/324 + */ + def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = { + if (jobType.equalsIgnoreCase("streaming")) { + conf.set( + "spark.dynamicAllocation.maxExecutors", + conf + .get("spark.flint.streaming.dynamicAllocation.maxExecutors", "10")) + } + } + def createSparkSession(conf: SparkConf): SparkSession = { val builder = SparkSession.builder().config(conf) if (enableHiveSupport) { @@ -97,6 +111,7 @@ trait FlintJobExecutor { try { resultData.write .format("flint") + .option(REFRESH_POLICY.optionKey, "wait_for") .mode("append") .save(resultIndex) IRestHighLevelClient.recordOperationSuccess( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 1016adaba..b96163693 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -91,6 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (jobType.equalsIgnoreCase("streaming")) { logInfo(s"""streaming query ${query}""") + configDYNMaxExecutors(conf, jobType) val streamingRunningCount = new AtomicInteger(0) val jobOperator = JobOperator( diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala index 352d140ce..aceb9468f 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala @@ -97,4 +97,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { |""".stripMargin assert(FlintJob.isSuperset(input, mapping)) } + + test("default streaming query maxExecutors is 10") { + val conf = spark.sparkContext.conf + FlintJob.configDYNMaxExecutors(conf, "streaming") + conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "10" + } + + test("override streaming query maxExecutors") { + spark.sparkContext.conf.set("spark.flint.streaming.dynamicAllocation.maxExecutors", "30") + FlintJob.configDYNMaxExecutors(spark.sparkContext.conf, "streaming") + spark.sparkContext.conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "30" + } + }