Skip to content

Commit

Permalink
Merge branch 'main' into support-covering-index-acceleration-for-iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed May 3, 2024
2 parents 4d0f1fd + d9c0ba8 commit fcae77a
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 22 deletions.
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,9 +506,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: default value is 1000.
- `spark.datasource.flint.write.refresh_policy`: default value is wait_for. valid values [NONE
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import java.io.Serializable;
import java.util.Map;

import org.apache.spark.network.util.ByteUnit;
import org.opensearch.flint.core.http.FlintRetryOptions;

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ public class FlintOptions implements Serializable {
*
* WAIT_UNTIL("wait_for")
*/
public static final String DEFAULT_REFRESH_POLICY = "wait_for";
public static final String DEFAULT_REFRESH_POLICY = "false";

public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";

Expand All @@ -84,6 +86,10 @@ public class FlintOptions implements Serializable {

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public static final String BATCH_BYTES = "write.batch_bytes";

public static final String DEFAULT_BATCH_BYTES = "1mb";

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -150,4 +156,10 @@ public String getDataSourceName() {
public String getSystemIndexName() {
return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, "");
}

public int getBatchBytes() {
// we did not expect this value could be large than 10mb = 10 * 1024 * 1024
return (int) org.apache.spark.network.util.JavaUtils
.byteStringAs(options.getOrDefault(BATCH_BYTES, DEFAULT_BATCH_BYTES), ByteUnit.BYTE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,10 @@ public FlintReader createReader(String indexName, String query) {
}

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
LOG.info(String.format("Creating Flint index writer for %s, refresh_policy:%s, " +
"batch_bytes:%d", indexName, options.getRefreshPolicy(), options.getBatchBytes()));
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName),
options.getRefreshPolicy(), options.getBatchBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public abstract class FlintWriter extends Writer {
* { "title": "Prisoners", "year": 2013 }
*/
public static final String ACTION_CREATE = "create";

/**
* @return current data written into buffer in bytes.
*/
public abstract long getBufferSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.rest.RestStatus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
Expand All @@ -27,19 +29,21 @@ public class OpenSearchWriter extends FlintWriter {

private final String refreshPolicy;

private StringBuilder sb;
private final ByteArrayOutputStream baos;

private IRestHighLevelClient client;

public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy) {
public OpenSearchWriter(IRestHighLevelClient client, String indexName, String refreshPolicy,
int bufferSizeInBytes) {
this.client = client;
this.indexName = indexName;
this.sb = new StringBuilder();
this.refreshPolicy = refreshPolicy;
this.baos = new ByteArrayOutputStream(bufferSizeInBytes);
}

@Override public void write(char[] cbuf, int off, int len) {
sb.append(cbuf, off, len);
byte[] bytes = new String(cbuf, off, len).getBytes(StandardCharsets.UTF_8);
baos.write(bytes, 0, bytes.length);
}

/**
Expand All @@ -48,8 +52,8 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
*/
@Override public void flush() {
try {
if (sb.length() > 0) {
byte[] bytes = sb.toString().getBytes();
if (baos.size() > 0) {
byte[] bytes = baos.toByteArray();
BulkResponse
response =
client.bulk(
Expand All @@ -63,7 +67,7 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
} catch (IOException e) {
throw new RuntimeException(String.format("Failed to execute bulk request on index: %s", indexName), e);
} finally {
sb.setLength(0);
baos.reset();
}
}

Expand All @@ -78,6 +82,10 @@ public OpenSearchWriter(IRestHighLevelClient client, String indexName, String re
}
}

public long getBufferSize() {
return baos.size();
}

private boolean isCreateConflict(BulkItemResponse itemResp) {
return itemResp.getOpType() == DocWriteRequest.OpType.CREATE && (itemResp.getFailure() == null || itemResp.getFailure()
.getStatus() == RestStatus.CONFLICT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class FlintPartitionWriter(
gen.writeLineEnding()

docCount += 1
if (docCount >= options.batchSize()) {
if (docCount >= options.batchSize() || gen.getBufferSize >= options.batchBytes()) {
gen.flush()
docCount = 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.flint.config.FlintSparkConf._
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -95,7 +96,14 @@ object FlintSparkConf {
"The number of documents written to Flint in a single batch request is determined by the " +
"overall size of the HTTP request, which should not exceed 100MB. The actual number of " +
"documents will vary depending on the individual size of each document.")
.createWithDefault("1000")
.createWithDefault(Integer.MAX_VALUE.toString)

val BATCH_BYTES = FlintConfig(s"spark.datasource.flint.${FlintOptions.BATCH_BYTES}")
.datasourceOption()
.doc(
"The approximately amount of data in bytes written to Flint in a single batch request. " +
s"The actual data write to OpenSearch may more than it. Default value is 1mb")
.createWithDefault(FlintOptions.DEFAULT_BATCH_BYTES)

val REFRESH_POLICY = FlintConfig("spark.datasource.flint.write.refresh_policy")
.datasourceOption()
Expand Down Expand Up @@ -194,6 +202,9 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def batchSize(): Int = BATCH_SIZE.readFrom(reader).toInt

def batchBytes(): Long = org.apache.spark.network.util.JavaUtils
.byteStringAs(BATCH_BYTES.readFrom(reader), ByteUnit.BYTE)

def docIdColumnName(): Option[String] = DOC_ID_COLUMN_NAME.readFrom(reader)

def ignoreIdColumn(): Boolean = IGNORE_DOC_ID_COLUMN.readFrom(reader).toBoolean
Expand Down Expand Up @@ -237,7 +248,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
PASSWORD,
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
REPL_INACTIVITY_TIMEOUT_MILLIS,
BATCH_BYTES)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

package org.apache.spark.sql.flint.json

import java.io.Writer

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter
import org.opensearch.flint.core.storage.FlintWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
Expand All @@ -24,7 +23,7 @@ import org.apache.spark.sql.types._
*/
case class FlintJacksonGenerator(
dataType: DataType,
writer: Writer,
writer: FlintWriter,
options: JSONOptions,
ignoredFieldName: Option[String] = None) {
// A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate
Expand Down Expand Up @@ -314,4 +313,8 @@ case class FlintJacksonGenerator(
})
})
}

def getBufferSize: Long = {
writer.getBufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import java.util.Optional

import scala.collection.JavaConverters._

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

Expand All @@ -27,7 +26,7 @@ class FlintSparkConfSuite extends FlintSuite {

// default value
assert(flintOptions.getPort == 9200)
assert(flintOptions.getRefreshPolicy == "wait_for")
assert(flintOptions.getRefreshPolicy == "false")
}
}

Expand Down Expand Up @@ -75,6 +74,16 @@ class FlintSparkConfSuite extends FlintSuite {
}
}

test("test batch bytes options") {
val defaultConf = FlintSparkConf(Map[String, String]().asJava)
defaultConf.batchBytes() shouldBe 1024 * 1024
defaultConf.flintOptions().getBatchBytes shouldBe 1024 * 1024

val overrideConf = FlintSparkConf(Map("write.batch_bytes" -> "4mb").asJava)
overrideConf.batchBytes() shouldBe 4 * 1024 * 1024
overrideConf.flintOptions().getBatchBytes shouldBe 4 * 1024 * 1024
}

/**
* Delete index `indexNames` after calling `f`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class FlintDataSourceV2ITSuite
df.coalesce(1)
.write
.format("flint")
.options(options + ("spark.flint.write.batch.size" -> s"$batchSize"))
.options(options + ("spark.flint.write.batch_size" -> s"$batchSize"))
.mode("overwrite")
.save(indexName)

Expand Down Expand Up @@ -499,6 +499,39 @@ class FlintDataSourceV2ITSuite
}
}

test("write dataframe to flint with batch bytes configuration") {
val indexName = "tbatchbytes"
val options = openSearchOptions + (s"${DOC_ID_COLUMN_NAME.optionKey}" -> "aInt")
Seq("0b", "10b", "1mb").foreach(batchBytes => {
withIndexName(indexName) {
val mappings =
"""{
| "properties": {
| "aInt": {
| "type": "integer"
| }
| }
|}""".stripMargin
index(indexName, oneNodeSetting, mappings, Seq.empty)

val df = spark.range(15).toDF("aInt")
df.coalesce(1)
.write
.format("flint")
.options(options + ("spark.flint.write.batch_bytes" -> s"$batchBytes"))
.mode("overwrite")
.save(indexName)

checkAnswer(
spark.sqlContext.read
.format("flint")
.options(openSearchOptions)
.load(indexName),
df)
}
})
}

/**
* Copy from SPARK JDBCV2Suite.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ object FlintJob extends Logging with FlintJobExecutor {
* Without this setup, Spark would not recognize names in the format `my_glue1.default`.
*/
conf.set("spark.sql.defaultCatalog", dataSource)
configDYNMaxExecutors(conf, jobType)

val streamingRunningCount = new AtomicInteger(0)
val jobOperator =
JobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import play.api.libs.json._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf.REFRESH_POLICY
import org.apache.spark.sql.types._
import org.apache.spark.sql.util._

Expand Down Expand Up @@ -85,6 +86,19 @@ trait FlintJobExecutor {
"org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions")
}

/*
* Override dynamicAllocation.maxExecutors with streaming maxExecutors. more detail at
* https://github.com/opensearch-project/opensearch-spark/issues/324
*/
def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = {
if (jobType.equalsIgnoreCase("streaming")) {
conf.set(
"spark.dynamicAllocation.maxExecutors",
conf
.get("spark.flint.streaming.dynamicAllocation.maxExecutors", "10"))
}
}

def createSparkSession(conf: SparkConf): SparkSession = {
val builder = SparkSession.builder().config(conf)
if (enableHiveSupport) {
Expand All @@ -97,6 +111,7 @@ trait FlintJobExecutor {
try {
resultData.write
.format("flint")
.option(REFRESH_POLICY.optionKey, "wait_for")
.mode("append")
.save(resultIndex)
IRestHighLevelClient.recordOperationSuccess(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor {

if (jobType.equalsIgnoreCase("streaming")) {
logInfo(s"""streaming query ${query}""")
configDYNMaxExecutors(conf, jobType)
val streamingRunningCount = new AtomicInteger(0)
val jobOperator =
JobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
|""".stripMargin
assert(FlintJob.isSuperset(input, mapping))
}

test("default streaming query maxExecutors is 10") {
val conf = spark.sparkContext.conf
FlintJob.configDYNMaxExecutors(conf, "streaming")
conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "10"
}

test("override streaming query maxExecutors") {
spark.sparkContext.conf.set("spark.flint.streaming.dynamicAllocation.maxExecutors", "30")
FlintJob.configDYNMaxExecutors(spark.sparkContext.conf, "streaming")
spark.sparkContext.conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "30"
}

}

0 comments on commit fcae77a

Please sign in to comment.