diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index 0ba9b4a79..008b04078 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -7,15 +7,20 @@ package org.opensearch.flint.spark import java.util.{Locale, UUID} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path, PathFilter} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.scalatest.matchers.must.Matchers.have import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.SparkHiveSupportSuite +import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY +import org.apache.spark.sql.internal.SQLConf class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSupportSuite { @@ -242,6 +247,31 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup } } + test( + "should bypass write permission check for checkpoint location if checkpoint manager class doesn't support create temp file") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + sql(s"INSERT INTO $testTable VALUES ('test')") + + withTempDir { checkpointDir => + // Set readonly to verify write permission check bypass + checkpointDir.setWritable(false) + + // Configure fake checkpoint file manager + val confKey = SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key + withSQLConf(confKey -> classOf[FakeCheckpointFileManager].getName) { + sql(s""" + | $createSkippingIndexStatement + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } + } + } + } + private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT) private def optionName(mode: RefreshMode): String = mode match { @@ -249,3 +279,28 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup case INCREMENTAL => "incremental_refresh" } } + +/** + * Fake checkpoint file manager. + */ +class FakeCheckpointFileManager(path: Path, conf: Configuration) extends CheckpointFileManager { + + override def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream = + throw new UnsupportedOperationException + + override def open(path: Path): FSDataInputStream = mock[FSDataInputStream] + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = Array() + + override def mkdirs(path: Path): Unit = throw new UnsupportedOperationException + + override def exists(path: Path): Boolean = true + + override def delete(path: Path): Unit = throw new UnsupportedOperationException + + override def isLocal: Boolean = throw new UnsupportedOperationException + + override def createCheckpointDirectory(): Path = path +}