Skip to content

Commit

Permalink
Add more IT for write permission bypass
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 11, 2024
1 parent c837bd8 commit d82a599
Showing 1 changed file with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ package org.opensearch.flint.spark

import java.util.{Locale, UUID}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path, PathFilter}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.scalatest.matchers.must.Matchers.have
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.sql.SparkHiveSupportSuite
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY
import org.apache.spark.sql.internal.SQLConf

class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSupportSuite {

Expand Down Expand Up @@ -242,10 +247,60 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
}
}

test(
"should bypass write permission check for checkpoint location if checkpoint manager class doesn't support create temp file") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING) USING JSON")
sql(s"INSERT INTO $testTable VALUES ('test')")

withTempDir { checkpointDir =>
// Set readonly to verify write permission check bypass
checkpointDir.setWritable(false)

// Configure fake checkpoint file manager
val confKey = SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key
withSQLConf(confKey -> classOf[FakeCheckpointFileManager].getName) {
sql(s"""
| $createSkippingIndexStatement
| WITH (
| incremental_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
}
}
}
}

private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT)

private def optionName(mode: RefreshMode): String = mode match {
case AUTO => "auto_refresh"
case INCREMENTAL => "incremental_refresh"
}
}

/**
* Fake checkpoint file manager.
*/
class FakeCheckpointFileManager(path: Path, conf: Configuration) extends CheckpointFileManager {

override def createAtomic(
path: Path,
overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream =
throw new UnsupportedOperationException

override def open(path: Path): FSDataInputStream = mock[FSDataInputStream]

override def list(path: Path, filter: PathFilter): Array[FileStatus] = Array()

override def mkdirs(path: Path): Unit = throw new UnsupportedOperationException

override def exists(path: Path): Boolean = true

override def delete(path: Path): Unit = throw new UnsupportedOperationException

override def isLocal: Boolean = throw new UnsupportedOperationException

override def createCheckpointDirectory(): Path = path
}

0 comments on commit d82a599

Please sign in to comment.