Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch_bytes configuration for Flint #329

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: default value is 1000.
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import java.io.Serializable;
import java.util.Map;

import org.apache.spark.network.util.ByteUnit;
import org.opensearch.flint.core.http.FlintRetryOptions;

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ public class FlintOptions implements Serializable {
*
* WAIT_UNTIL("wait_for")
*/
public static final String DEFAULT_REFRESH_POLICY = "wait_for";
public static final String DEFAULT_REFRESH_POLICY = "false";

public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";

Expand All @@ -84,6 +86,10 @@ public class FlintOptions implements Serializable {

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public static final String BATCH_BYTES = "write.batch_bytes";

public static final String DEFAULT_BATCH_BYTES = "1mb";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -150,4 +156,10 @@ public String getDataSourceName() {
public String getSystemIndexName() {
return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, "");
}

public int getBatchBytes() {
// we did not expect this value could be large than 10mb = 10 * 1024 * 1024
return (int) org.apache.spark.network.util.JavaUtils
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) {
}

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " +
"batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes()));
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName),
options.getRefreshPolicy(), options.getBatchBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public abstract class FlintWriter extends Writer {
* { "title": "Prisoners", "year": 2013 }
*/
public static final String ACTION_CREATE = "create";

/**
* @return current data written into buffer in bytes.
*/
public abstract long getBufferSize();
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.rest.RestStatus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
Expand All @@ -27,19 +29,21 @@ public class OpenSearchWriter extends FlintWriter {

private final String refreshPolicy;

private StringBuilder sb;
private final ByteArrayOutputStream baos;

private IRestHighLevelClient client;

public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) {
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy,
int bufferSizeInBytes) {
this.client = client;
this.indexName = indexName;
this.sb = new StringBuilder();
this.refreshPolicy = refreshPolicy;
this.baos = new ByteArrayOutputStream(bufferSizeInBytes);
}

@Override public void write(char[] cbuf, int off, int len) {
sb.append(cbuf, off, len);
byte[] bytes = new String(cbuf, off, len).getBytes(StandardCharsets.UTF_8);
baos.write(bytes, 0, bytes.length);
}

/**
Expand All @@ -48,8 +52,8 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
*/
@Override public void flush() {
try {
if (sb.length() > 0) {
byte[] bytes = sb.toString().getBytes();
if (baos.size() > 0) {
byte[] bytes = baos.toByteArray();
BulkResponse
response =
client.bulk(
Expand All @@ -63,7 +67,7 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to execute bulk request on index: %s", indexName), e);
} finally {
sb.setLength(0);
baos.reset();
}
}

Expand All @@ -78,6 +82,10 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
}
}

public long getBufferSize() {
return baos.size();
}

private boolean isCreateConflict(BulkItemResponse itemResp) {
return itemResp.getOpType() == DocWriteRequest.OpType.CREATE && (itemResp.getFailure() == null || itemResp.getFailure()
.getStatus() == RestStatus.CONFLICT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class FlintPartitionWriter(
gen.writeLineEnding()

docCount += 1
if (docCount >= options.batchSize()) {
if (docCount >= options.batchSize() || gen.getBufferSize >= options.batchBytes()) {
gen.flush()
docCount = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -95,7 +96,14 @@ object FlintSparkConf {
"The number of documents written to Flint in a single batch request is determined by the " +
"overall size of the HTTP request, which should not exceed 100MB. The actual number of " +
"documents will vary depending on the individual size of each document.")
.createWithDefault("1000")
.createWithDefault(Integer.MAX_VALUE.toString)

val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}")
.datasourceOption()
.doc(
"The approximately amount of data in bytes written to Flint in a single batch request. " +
s"The actual data write to OpenSearch may more than it. Default value is 1mb")
.createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES)

val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
Expand Down Expand Up @@ -194,6 +202,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def batchSize(): Int = BATCH_SIZE.readFrom(reader).toInt

def batchBytes(): Long = org.apache.spark.network.util.JavaUtils
.byteStringAs(BATCH_BYTES.readFrom(reader), ByteUnit.BYTE)

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def ignoreIdColumn(): Boolean = IGNORE_DOC_ID_COLUMN.readFrom(reader).toBoolean
Expand Down Expand Up @@ -237,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
REPL_INACTIVITY_TIMEOUT_MILLIS,
BATCH_BYTES)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package org.apache.spark.sql.flint.json

import java.io.Writer

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import org.opensearch.flint.core.storage.FlintWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand All @@ -24,7 +23,7 @@ import org.apache.spark.sql.types._
*/
case class FlintJacksonGenerator(
dataType: DataType,
writer: Writer,
writer: FlintWriter,
options: JSONOptions,
ignoredFieldName: Option[String] = None) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
Expand Down Expand Up @@ -314,4 +313,8 @@ case class FlintJacksonGenerator(
})
})
}

def getBufferSize: Long = {
writer.getBufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import java.util.Optional

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

Expand All @@ -27,7 +26,7 @@ class FlintSparkConfSuite extends FlintSuite {

// default value
assert(flintOptions.getPort == 9200)
assert(flintOptions.getRefreshPolicy == "wait_for")
assert(flintOptions.getRefreshPolicy == "false")
}
}

Expand Down Expand Up @@ -75,6 +74,16 @@ class FlintSparkConfSuite extends FlintSuite {
}
}

test("test batch bytes options") {
val defaultConf = FlintSparkConf(Map[String, String]().asJava)
defaultConf.batchBytes() shouldBe 1024 * 1024
defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024

val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava)
overrideConf.batchBytes() shouldBe 4 * 1024 * 1024
overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024
}

/**
* Delete index `indexNames` after calling `f`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class FlintDataSourceV2ITSuite
df.coalesce(1)
.write
.format("flint")
.options(options + ("spark.flint.write.batch.size" -> s"$batchSize"))
.options(options + ("spark.flint.write.batch_size" -> s"$batchSize"))
.mode("overwrite")
.save(indexName)

Expand Down Expand Up @@ -499,6 +499,39 @@ class FlintDataSourceV2ITSuite
}
}

test("write dataframe to flint with batch bytes configuration") {
val indexName = "tbatchbytes"
val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
Seq("0b", "10b", "1mb").foreach(batchBytes => {
withIndexName(indexName) {
val mappings =
"""{
| "properties": {
| "aInt": {
| "type": "integer"
| }
| }
|}""".stripMargin
index(indexName, oneNodeSetting, mappings, Seq.empty)

val df = spark.range(15).toDF("aInt")
df.coalesce(1)
.write
.format("flint")
.options(options + ("spark.flint.write.batch_bytes" -> s"$batchBytes"))
.mode("overwrite")
.save(indexName)

checkAnswer(
spark.sqlContext.read
.format("flint")
.options(openSearchOptions)
.load(indexName),
df)
}
})
}

/**
* Copy from SPARK JDBCV2Suite.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import play.api.libs.json._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
import org.apache.spark.sql.util._

Expand Down Expand Up @@ -97,6 +98,7 @@ trait FlintJobExecutor {
try {
resultData.write
.format("flint")
.option(REFRESH_POLICY.optionKey, "wait_for")
.mode("append")
.save(resultIndex)
IRestHighLevelClient.recordOperationSuccess(
Expand Down
Loading