Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed May 2, 2024
1 parent 5861017 commit cbbaee3
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 4mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import java.io.Serializable;
import java.util.Map;

import org.apache.spark.network.util.ByteUnit;
import org.opensearch.flint.core.http.FlintRetryOptions;

/**
Expand Down Expand Up @@ -84,6 +86,10 @@ public class FlintOptions implements Serializable {

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public static final String BATCH_BYTES = "write.batch_bytes";

public static final String DEFAULT_BATCH_BYTES = "1mb";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -150,4 +156,10 @@ public String getDataSourceName() {
public String getSystemIndexName() {
return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, "");
}

public int getBatchBytes() {
// we did not expect this value could be large than 10mb = 10 * 1024 * 1024
return (int) org.apache.spark.network.util.JavaUtils
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) {
}

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " +
"batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes()));
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName),
options.getRefreshPolicy(), options.getBatchBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ public class OpenSearchWriter extends FlintWriter {

private IRestHighLevelClient client;

public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) {
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy,
int bufferSizeInBytes) {
this.client = client;
this.indexName = indexName;
this.refreshPolicy = refreshPolicy;
this.baos = new ByteArrayOutputStream();
this.baos = new ByteArrayOutputStream(bufferSizeInBytes);
}

@Override public void write(char[] cbuf, int off, int len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ object FlintSparkConf {
"documents will vary depending on the individual size of each document.")
.createWithDefault(Integer.MAX_VALUE.toString)

val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.write.batch_bytes")
val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}")
.datasourceOption()
.doc(
"The approximately amount of data in bytes written to Flint in a single batch request. " +
"The actual data write to OpenSearch may more than it. Default value is 4mb")
.createWithDefault("4mb")
s"The actual data write to OpenSearch may more than it. Default value is 1mb")
.createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES)

val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
Expand Down Expand Up @@ -248,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
REPL_INACTIVITY_TIMEOUT_MILLIS,
BATCH_BYTES)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ class FlintSparkConfSuite extends FlintSuite {
}

test("test batch bytes options") {
val defaultConf = FlintSparkConf(Map("write.batch_size" -> "10").asJava)
defaultConf.batchBytes() shouldBe 4 * 1024 * 1024
val defaultConf = FlintSparkConf(Map[String, String]().asJava)
defaultConf.batchBytes() shouldBe 1024 * 1024
defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024

val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "1mb").asJava)
overrideConf.batchBytes() shouldBe 1024 * 1024
val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava)
overrideConf.batchBytes() shouldBe 4 * 1024 * 1024
overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024
}

/**
Expand Down

0 comments on commit cbbaee3

Please sign in to comment.