From d0b622ff97837cc5aa97e6c27ed5b6bec155cdfa Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 8 Jul 2024 15:56:27 -0700 Subject: [PATCH 1/4] Add write permission check Signed-off-by: Chen Dai --- .../spark/FlintSparkValidationHelper.scala | 21 ++++++++--- .../spark/refresh/AutoIndexRefresh.scala | 2 +- .../refresh/IncrementalIndexRefresh.scala | 2 +- .../FlintSparkIndexValidationITSuite.scala | 36 +++++++++++++++++-- 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 987e4366d..00fbca23e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import java.io.IOException +import java.util.UUID import org.apache.hadoop.fs.Path import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex @@ -76,14 +77,26 @@ trait FlintSparkValidationHelper extends Logging { new Path(checkpointLocation), spark.sessionState.newHadoopConf()) - // The primary intent here is to catch any exceptions during the accessibility check. - // The actual result is ignored, as Spark can create any necessary sub-folders - // when the streaming job starts. + /* + * Read permission check: The primary intent here is to catch any exceptions + * during the accessibility check. The actual result is ignored, as Spark can + * create any necessary sub-folders when the streaming job starts. + */ checkpointManager.exists(new Path(checkpointLocation)) + + /* + * Write permission check: Attempt to create a temporary file to verify write access. + * The temporary file is left in place in case additional delete permissions are required + * for some file systems. + */ + val tempFilePath = new Path(checkpointLocation, s"${UUID.randomUUID().toString}.tmp") + val outputStream = checkpointManager.createAtomic(tempFilePath, overwriteIfPossible = true) + outputStream.close() + true } catch { case e: IOException => - logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e) + logWarning(s"Failed to check if checkpoint location $checkpointLocation accessible", e) false } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index dd565fd18..db2f321a0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -56,7 +56,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) if (checkpointLocation.isDefined) { require( isCheckpointLocationAccessible(spark, checkpointLocation.get), - s"No permission to access the checkpoint location ${checkpointLocation.get}") + s"No sufficient permission to access the checkpoint location ${checkpointLocation.get}") } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index b52f1ae79..5ee7308f1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -39,7 +39,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) "Checkpoint location is required by incremental refresh") require( isCheckpointLocationAccessible(spark, checkpointLocation.get), - s"No permission to access the checkpoint location ${checkpointLocation.get}") + s"No sufficient permission to access the checkpoint location ${checkpointLocation.get}") } override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index cc0573d6c..cb0cd3173 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.{Locale, UUID} +import java.util.Locale import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView @@ -103,6 +103,38 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup } } + Seq( + (AUTO, createSkippingIndexStatement), + (AUTO, createCoveringIndexStatement), + (AUTO, createMaterializedViewStatement), + (INCREMENTAL, createSkippingIndexStatement), + (INCREMENTAL, createCoveringIndexStatement), + (INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (refreshMode, statement) => + test( + s"should fail to create $refreshMode refresh Flint index if checkpoint location is not writable: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + withTempDir { checkpointDir => + // Set checkpoint dir readonly to simulate the exception + checkpointDir.setWritable(false) + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = "$checkpointDir" + | ) + |""".stripMargin) + } should have message + s"requirement failed: No sufficient permission to access the checkpoint location $checkpointDir" + } + } + } + } + Seq( (AUTO, createSkippingIndexStatement), (AUTO, createCoveringIndexStatement), @@ -127,7 +159,7 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup | ) |""".stripMargin) } should have message - s"requirement failed: No permission to access the checkpoint location $checkpointDir" + s"requirement failed: No sufficient permission to access the checkpoint location $checkpointDir" } } } From f7a19eb45760430e1a97859a94a4efcdc3afd673 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 9 Jul 2024 16:16:13 -0700 Subject: [PATCH 2/4] Create checkpoint folder before check Signed-off-by: Chen Dai --- .../spark/FlintSparkValidationHelper.scala | 5 ++- .../FlintSparkIndexValidationITSuite.scala | 35 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index 00fbca23e..d48f9e6dd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -89,7 +89,10 @@ trait FlintSparkValidationHelper extends Logging { * The temporary file is left in place in case additional delete permissions are required * for some file systems. */ - val tempFilePath = new Path(checkpointLocation, s"${UUID.randomUUID().toString}.tmp") + val tempFilePath = + new Path( + checkpointManager.createCheckpointDirectory(), + s"${UUID.randomUUID().toString}.tmp") val outputStream = checkpointManager.createAtomic(tempFilePath, overwriteIfPossible = true) outputStream.close() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index cb0cd3173..0ba9b4a79 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.Locale +import java.util.{Locale, UUID} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView @@ -205,10 +205,43 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup sql(statement) flint.refreshIndex(flintIndexName) flint.queryIndex(flintIndexName).count() shouldBe 1 + + deleteTestIndex(flintIndexName) } } } + Seq( + (skippingIndexName, AUTO, createSkippingIndexStatement), + (coveringIndexName, AUTO, createCoveringIndexStatement), + (materializedViewName, AUTO, createMaterializedViewStatement), + (skippingIndexName, INCREMENTAL, createSkippingIndexStatement), + (coveringIndexName, INCREMENTAL, createCoveringIndexStatement), + (materializedViewName, INCREMENTAL, createMaterializedViewStatement)) + .foreach { case (flintIndexName, refreshMode, statement) => + test( + s"should succeed to create $refreshMode refresh Flint index even if checkpoint sub-folder doesn't exist: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + sql(s"INSERT INTO $testTable VALUES ('test')") + + withTempDir { checkpointDir => + // Specify nonexistent sub-folder and expect pre-validation to pass + val nonExistCheckpointDir = s"$checkpointDir/${UUID.randomUUID().toString}" + sql(s""" + | $statement + | WITH ( + | ${optionName(refreshMode)} = true, + | checkpoint_location = '$nonExistCheckpointDir' + | ) + |""".stripMargin) + + deleteTestIndex(flintIndexName) + } + } + } + } + private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT) private def optionName(mode: RefreshMode): String = mode match { From c837bd87fc3a7b97734095a6c02172c9ee4ada88 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 10 Jul 2024 11:41:14 -0700 Subject: [PATCH 3/4] Use create temp file API to avoid rename Signed-off-by: Chen Dai --- .../spark/FlintSparkValidationHelper.scala | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index d48f9e6dd..f615d9398 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -5,7 +5,6 @@ package org.opensearch.flint.spark -import java.io.IOException import java.util.UUID import org.apache.hadoop.fs.Path @@ -18,6 +17,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} /** @@ -79,27 +79,35 @@ trait FlintSparkValidationHelper extends Logging { /* * Read permission check: The primary intent here is to catch any exceptions - * during the accessibility check. The actual result is ignored, as Spark can - * create any necessary sub-folders when the streaming job starts. + * during the accessibility check. The actual result is ignored, as the write + * permission check below will create any necessary sub-folders. */ checkpointManager.exists(new Path(checkpointLocation)) /* * Write permission check: Attempt to create a temporary file to verify write access. - * The temporary file is left in place in case additional delete permissions are required - * for some file systems. + * The temporary file is left in place in case additional delete permissions required. */ - val tempFilePath = - new Path( - checkpointManager.createCheckpointDirectory(), - s"${UUID.randomUUID().toString}.tmp") - val outputStream = checkpointManager.createAtomic(tempFilePath, overwriteIfPossible = true) - outputStream.close() + checkpointManager match { + case manager: RenameHelperMethods => + val tempFilePath = + new Path( + checkpointManager + .createCheckpointDirectory(), // create all parent folders if needed + s"${UUID.randomUUID().toString}.tmp") + + manager.createTempFile(tempFilePath).close() + case _ => + logInfo( + s"Bypass checkpoint location write permission check: ${checkpointManager.getClass}") + } true } catch { - case e: IOException => - logWarning(s"Failed to check if checkpoint location $checkpointLocation accessible", e) + case e: Exception => + logWarning( + s"Exception occurred while verifying access to checkpoint location $checkpointLocation", + e) false } } From d82a59923176ac86c3b685e298de3ea0a598908e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 11 Jul 2024 16:09:46 -0700 Subject: [PATCH 4/4] Add more IT for write permission bypass Signed-off-by: Chen Dai --- .../FlintSparkIndexValidationITSuite.scala | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index 0ba9b4a79..008b04078 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -7,15 +7,20 @@ package org.opensearch.flint.spark import java.util.{Locale, UUID} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path, PathFilter} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.scalatest.matchers.must.Matchers.have import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} +import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.sql.SparkHiveSupportSuite +import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY +import org.apache.spark.sql.internal.SQLConf class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSupportSuite { @@ -242,6 +247,31 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup } } + test( + "should bypass write permission check for checkpoint location if checkpoint manager class doesn't support create temp file") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + sql(s"INSERT INTO $testTable VALUES ('test')") + + withTempDir { checkpointDir => + // Set readonly to verify write permission check bypass + checkpointDir.setWritable(false) + + // Configure fake checkpoint file manager + val confKey = SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key + withSQLConf(confKey -> classOf[FakeCheckpointFileManager].getName) { + sql(s""" + | $createSkippingIndexStatement + | WITH ( + | incremental_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin) + } + } + } + } + private def lowercase(mode: RefreshMode): String = mode.toString.toLowerCase(Locale.ROOT) private def optionName(mode: RefreshMode): String = mode match { @@ -249,3 +279,28 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup case INCREMENTAL => "incremental_refresh" } } + +/** + * Fake checkpoint file manager. + */ +class FakeCheckpointFileManager(path: Path, conf: Configuration) extends CheckpointFileManager { + + override def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream = + throw new UnsupportedOperationException + + override def open(path: Path): FSDataInputStream = mock[FSDataInputStream] + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = Array() + + override def mkdirs(path: Path): Unit = throw new UnsupportedOperationException + + override def exists(path: Path): Boolean = true + + override def delete(path: Path): Unit = throw new UnsupportedOperationException + + override def isLocal: Boolean = throw new UnsupportedOperationException + + override def createCheckpointDirectory(): Path = path +}