Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5] Update checkpoint location on alter path #631

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import java.util.{Collections, UUID}
import java.util.Collections

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -81,8 +81,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
index: FlintSparkIndex,
updateOptions: FlintSparkIndexOptions): FlintSparkIndex = {
val originalOptions = index.options
val updatedOptions =
originalOptions.copy(options = originalOptions.options ++ updateOptions.options)
val updatedOptions = updateOptionWithDefaultCheckpointLocation(
index.name(),
originalOptions.copy(options = originalOptions.options ++ updateOptions.options))
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
Expand Down Expand Up @@ -159,22 +160,13 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
indexName: String,
options: FlintSparkIndexOptions): FlintSparkIndexOptions = {

val checkpointLocationRootDirOption = new FlintSparkConf(
Collections.emptyMap[String, String]).checkpointLocationRootDir

if (options.checkpointLocation().isEmpty) {
checkpointLocationRootDirOption match {
case Some(checkpointLocationRootDir) =>
// Currently, deleting and recreating the flint index will enter same checkpoint dir.
// Use a UUID to isolate checkpoint data.
val checkpointLocation =
s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
FlintSparkIndexOptions(
options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation))
case None => options
}
} else {
options
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation(indexName, flintSparkConf)

checkpointLocation match {
case Some(location) =>
FlintSparkIndexOptions(options.options + (CHECKPOINT_LOCATION.toString -> location))
case None => options
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package org.opensearch.flint.spark

import java.util.UUID

import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

import org.apache.spark.sql.flint.config.FlintSparkConf

/**
* Flint Spark index configurable options.
*
Expand Down Expand Up @@ -165,6 +169,18 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
case None => // no action needed if modeStr is empty
}
}

def checkpointLocation(indexName: String, flintSparkConf: FlintSparkConf): Option[String] = {
options.get(CHECKPOINT_LOCATION.toString) match {
case Some(location) => Some(location)
case None =>
// Currently, deleting and recreating the flint index will enter same checkpoint dir.
// Use a UUID to isolate checkpoint data.
flintSparkConf.checkpointLocationRootDir.map { rootDir =>
s"${rootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
}
}
}
}

object FlintSparkIndexOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
}

test("indexOptions should not override existing checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options =
Expand All @@ -70,7 +70,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
}

test("indexOptions should have default checkpoint location with conf") {
conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation)
assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))

val options = FlintSparkIndexOptions(Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

test("create covering index with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)
flint
.coveringIndex()
Expand Down Expand Up @@ -213,6 +213,59 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}

test("update covering index successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create an full refresh CV
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(FlintSparkIndexOptions.empty, testFlintIndex)
.create()
var indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq())

var index = flint.describeIndex(testFlintIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testFlintIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update index to auto refresh
val updatedIndex = flint
.coveringIndex()
.copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))

index = flint.describeIndex(testFlintIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testFlintIndex),
s"Checkpoint location dir should contain ${testFlintIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

test("can have multiple covering indexes on a table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

test("create materialized view with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

val indexOptions =
Expand Down Expand Up @@ -268,6 +268,70 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
}
}

test("update materialized view successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create full refresh MV
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(FlintSparkIndexOptions.empty, testFlintIndex)
.create()
var indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq())

var index = flint.describeIndex(testFlintIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testFlintIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update Flint index to auto refresh and wait for complete
val updatedIndex = flint
.materializedView()
.copyWithUpdate(
index.get,
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "1 Minute")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

indexData = flint.queryIndex(testFlintIndex)
checkAnswer(
indexData.select("startTime", "count"),
Seq(
Row(timestamp("2023-10-01 00:00:00"), 1),
Row(timestamp("2023-10-01 00:10:00"), 2),
Row(timestamp("2023-10-01 01:00:00"), 1)
/*
* The last row is pending to fire upon watermark
* Row(timestamp("2023-10-01 02:00:00"), 1)
*/
))

index = flint.describeIndex(testFlintIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testFlintIndex),
s"Checkpoint location dir should contain ${testFlintIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)

private def withIncrementalMaterializedView(query: String)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {

test("create skipping index with default checkpoint location successfully") {
withTempDir { checkpointDir =>
conf.setConfString(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)
flint
.skippingIndex()
Expand Down Expand Up @@ -369,6 +369,58 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 2
}

test("update skipping index successfully with custom checkpoint location") {
withTempDir { checkpointDir =>
// 1. Create full refresh SI
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions.empty, testIndex)
.create()

flint.queryIndex(testIndex).collect().toSet should have size 0

var index = flint.describeIndex(testIndex)
var checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 2. Update the spark conf with a custom checkpoint location
setFlintSparkConf(
FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR,
checkpointDir.getAbsolutePath)

index = flint.describeIndex(testIndex)
checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined")

// 3. Update Flint index to auto refresh and wait for complete
val updatedIndex =
flint
.skippingIndex()
.copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe defined

val job = spark.streams.get(jobId.get)
failAfter(streamingTimeout) {
job.processAllAvailable()
}

flint.queryIndex(testIndex).collect().toSet should have size 2

index = flint.describeIndex(testIndex)

checkpointLocation = index.get.options.checkpointLocation()
assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
assert(
checkpointLocation.get.contains(testIndex),
s"Checkpoint location dir should contain ${testIndex}")

conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
}
}

test("can have only 1 skipping index on a table") {
flint
.skippingIndex()
Expand Down
Loading