Skip to content

Commit

Permalink
Use refresh policy from config (#530)
Browse files Browse the repository at this point in the history
  • Loading branch information
ykmr1224 authored and seankao-az committed Aug 14, 2024
1 parent 7c19a39 commit 5c4f050
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
new IndexRequest()
.index(metadataLogIndexName)
.id(logEntryWithId.id())
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(options.getRefreshPolicy())
.source(toJson(logEntryWithId), XContentType.JSON),
RequestOptions.DEFAULT));
}
Expand All @@ -166,7 +166,7 @@ private FlintMetadataLogEntry updateLogEntry(FlintMetadataLogEntry logEntry) {
client -> client.update(
new UpdateRequest(metadataLogIndexName, logEntry.id())
.doc(toJson(logEntry), XContentType.JSON)
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL)
.setRefreshPolicy(options.getRefreshPolicy())
.setIfSeqNo((Long) logEntry.entryVersion().get("seqNo").get())
.setIfPrimaryTerm((Long) logEntry.entryVersion().get("primaryTerm").get()),
RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;

import java.io.IOException;
Expand All @@ -25,10 +26,12 @@ public class OpenSearchUpdater {

private final String indexName;
private final FlintClient flintClient;
private final FlintOptions options;

public OpenSearchUpdater(String indexName, FlintClient flintClient) {
public OpenSearchUpdater(String indexName, FlintClient flintClient, FlintOptions options) {
this.indexName = indexName;
this.flintClient = flintClient;
this.options = options;
}

public void upsert(String id, String doc) {
Expand Down Expand Up @@ -61,7 +64,7 @@ private void updateDocument(String id, String doc, boolean upsert, long seqNo, l
assertIndexExist(client, indexName);
UpdateRequest updateRequest = new UpdateRequest(indexName, id)
.doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
.setRefreshPolicy(options.getRefreshPolicy());

if (upsert) {
updateRequest.docAsUpsert(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,8 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {

flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava));
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
updater = new OpenSearchUpdater(
requestIndex,
new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)))

val options = new FlintOptions(openSearchOptions.asJava)
updater = new OpenSearchUpdater(requestIndex, new FlintOpenSearchClient(options), options)
}

override def afterEach(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ class OpenSearchUpdaterSuite extends OpenSearchTransactionSuite with Matchers {
override def beforeAll(): Unit = {
super.beforeAll()
flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava));
updater = new OpenSearchUpdater(
testMetaLogIndex,
new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)))
val options = new FlintOptions(openSearchOptions.asJava)
updater = new OpenSearchUpdater(testMetaLogIndex, new FlintOpenSearchClient(options), options)
}

test("upsert flintJob should success") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ trait FlintJobExecutor {
builder.getOrCreate()
}

private def writeData(resultData: DataFrame, resultIndex: String): Unit = {
private def writeData(
resultData: DataFrame,
resultIndex: String,
refreshPolicy: String): Unit = {
try {
resultData.write
.format("flint")
.option(REFRESH_POLICY.optionKey, "wait_for")
.option(REFRESH_POLICY.optionKey, refreshPolicy)
.mode("append")
.save(resultIndex)
IRestHighLevelClient.recordOperationSuccess(
Expand All @@ -158,11 +161,12 @@ trait FlintJobExecutor {
resultData: DataFrame,
resultIndex: String,
osClient: OSClient): Unit = {
val refreshPolicy = osClient.flintOptions.getRefreshPolicy;
if (osClient.doesIndexExist(resultIndex)) {
writeData(resultData, resultIndex)
writeData(resultData, resultIndex, refreshPolicy)
} else {
createResultIndex(osClient, resultIndex, resultIndexMapping)
writeData(resultData, resultIndex)
writeData(resultData, resultIndex, refreshPolicy)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
}

def createUpdater(indexName: String): OpenSearchUpdater =
new OpenSearchUpdater(indexName, flintClient)
new OpenSearchUpdater(indexName, flintClient, flintOptions)

def getDoc(osIndexName: String, id: String): GetResponse = {
using(flintClient.createClient()) { client =>
Expand Down

0 comments on commit 5c4f050

Please sign in to comment.