From 5c4f0500c06078cea1a4186060633bac02512d5a Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Wed, 7 Aug 2024 16:32:27 -0700 Subject: [PATCH] Use refresh policy from config (#530) --- .../core/storage/FlintOpenSearchMetadataLog.java | 4 ++-- .../flint/core/storage/OpenSearchUpdater.java | 7 +++++-- .../org/apache/spark/sql/FlintREPLITSuite.scala | 6 ++---- .../flint/core/OpenSearchUpdaterSuite.scala | 5 ++--- .../org/apache/spark/sql/FlintJobExecutor.scala | 12 ++++++++---- .../main/scala/org/apache/spark/sql/OSClient.scala | 2 +- 6 files changed, 20 insertions(+), 16 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java index ba99359b1..23d2ac842 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLog.java @@ -155,7 +155,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) { new IndexRequest() .index(metadataLogIndexName) .id(logEntryWithId.id()) - .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(options.getRefreshPolicy()) .source(toJson(logEntryWithId), XContentType.JSON), RequestOptions.DEFAULT)); } @@ -166,7 +166,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) { client -> client.update( new UpdateRequest(metadataLogIndexName, logEntry.id()) .doc(toJson(logEntry), XContentType.JSON) - .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL) + .setRefreshPolicy(options.getRefreshPolicy()) .setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get()) .setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()), RequestOptions.DEFAULT)); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 0d84b4956..d9dc54783 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -6,6 +6,7 @@ import org.opensearch.client.indices.GetIndexRequest; import org.opensearch.common.xcontent.XContentType; import org.opensearch.flint.core.FlintClient; +import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import java.io.IOException; @@ -25,10 +26,12 @@ public class OpenSearchUpdater { private final String indexName; private final FlintClient flintClient; + private final FlintOptions options; - public OpenSearchUpdater(String indexName, FlintClient flintClient) { + public OpenSearchUpdater(String indexName, FlintClient flintClient, FlintOptions options) { this.indexName = indexName; this.flintClient = flintClient; + this.options = options; } public void upsert(String id, String doc) { @@ -61,7 +64,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l assertIndexExist(client, indexName); UpdateRequest updateRequest = new UpdateRequest(indexName, id) .doc(doc, XContentType.JSON) - .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + .setRefreshPolicy(options.getRefreshPolicy()); if (upsert) { updateRequest.docAsUpsert(true); diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala index bdf2ffc83..169be002c 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala @@ -117,10 +117,8 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest { flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)); osClient = new OSClient(new FlintOptions(openSearchOptions.asJava)) - updater = new OpenSearchUpdater( - requestIndex, - new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))) - + val options = new FlintOptions(openSearchOptions.asJava) + updater = new OpenSearchUpdater(requestIndex, new FlintOpenSearchClient(options), options) } override def afterEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala index 3b317a0fe..1ea7c2c76 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/OpenSearchUpdaterSuite.scala @@ -32,9 +32,8 @@ class OpenSearchUpdaterSuite extends OpenSearchTransactionSuite with Matchers { override def beforeAll(): Unit = { super.beforeAll() flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)); - updater = new OpenSearchUpdater( - testMetaLogIndex, - new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))) + val options = new FlintOptions(openSearchOptions.asJava) + updater = new OpenSearchUpdater(testMetaLogIndex, new FlintOpenSearchClient(options), options) } test("upsert flintJob should success") { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 7f33e8e5e..26f98da23 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -127,11 +127,14 @@ trait FlintJobExecutor { builder.getOrCreate() } - private def writeData(resultData: DataFrame, resultIndex: String): Unit = { + private def writeData( + resultData: DataFrame, + resultIndex: String, + refreshPolicy: String): Unit = { try { resultData.write .format("flint") - .option(REFRESH_POLICY.optionKey, "wait_for") + .option(REFRESH_POLICY.optionKey, refreshPolicy) .mode("append") .save(resultIndex) IRestHighLevelClient.recordOperationSuccess( @@ -158,11 +161,12 @@ trait FlintJobExecutor { resultData: DataFrame, resultIndex: String, osClient: OSClient): Unit = { + val refreshPolicy = osClient.flintOptions.getRefreshPolicy; if (osClient.doesIndexExist(resultIndex)) { - writeData(resultData, resultIndex) + writeData(resultData, resultIndex, refreshPolicy) } else { createResultIndex(osClient, resultIndex, resultIndexMapping) - writeData(resultData, resultIndex) + writeData(resultData, resultIndex, refreshPolicy) } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index f5e4ec2be..e6f600a8d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -113,7 +113,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { } def createUpdater(indexName: String): OpenSearchUpdater = - new OpenSearchUpdater(indexName, flintClient) + new OpenSearchUpdater(indexName, flintClient, flintOptions) def getDoc(osIndexName: String, id: String): GetResponse = { using(flintClient.createClient()) { client =>