Skip to content

Commit

Permalink
Store latest id in Flint metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 31, 2023
1 parent ceb5c4c commit d9f9e00
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.initialLog(latest => latest.state == EMPTY)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest => flintClient.createIndex(indexName, metadata))
.commit(latest =>
if (latest == null) {
flintClient.createIndex(indexName, metadata)
} else {
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ package org.opensearch.flint.spark

import java.util.Base64

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.flint.OpenSearchTransactionSuite
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.should.Matchers

class FlintSparkTransactionITSuite
extends OpenSearchTransactionSuite
with Matchers {
class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Matchers {

/** Test table and index name */
private val testTable = "spark_catalog.default.flint_tx_test"
Expand All @@ -37,7 +40,19 @@ class FlintSparkTransactionITSuite
.onTable(testTable)
.addPartitions("year", "month")
.create()

latestLogEntry(testLatestId) should contain("state" -> "active")

implicit val formats: Formats = Serialization.formats(NoTypeHints)
val mapping =
openSearchClient
.indices()
.get(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT)
.getMappings
.get(testFlintIndex)
.source()
.string()
(parse(mapping) \ "_meta" \ "latestId").extract[String] shouldBe testLatestId
}

test("manual refresh index") {
Expand Down

0 comments on commit d9f9e00

Please sign in to comment.