From 029f84395d6493fdb6023ad0beb9337f5e990d76 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 19 Mar 2024 16:01:01 -0700 Subject: [PATCH 1/4] Reduce default inactivity limit to 3s (#287) Signed-off-by: Peng Huo --- .../src/main/scala/org/opensearch/flint/core/FlintOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 7020deba8..1282e1c94 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -77,7 +77,7 @@ public class FlintOptions implements Serializable { public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000; - public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000; + public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000; public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; From eb03705d602b1b9b96d06baee2c910eae15ff64a Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Thu, 21 Mar 2024 16:07:30 -0700 Subject: [PATCH 2/4] Fix shutdown bug due to non-daemon thread in driver (#292) Similar to https://github.com/opensearch-project/opensearch-spark/pull/175, this PR adds shutdown logic in FlintJob. Tests: * Verified in IT if terminateJVM is enabled, JVM would shut down. Signed-off-by: Kaituo Li --- .../org/apache/spark/sql/FlintJobITSuite.scala | 2 +- .../org/apache/spark/sql/FlintJobExecutor.scala | 2 ++ .../scala/org/apache/spark/sql/FlintREPL.scala | 2 -- .../scala/org/apache/spark/sql/JobOperator.scala | 14 +++++++++++++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala index 86bf567f5..59016d6bc 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -76,7 +76,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { JobOperator(spark, query, dataSourceName, resultIndex, true, streamingRunningCount) job.envinromentProvider = new MockEnvironment( Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId)) - + job.terminateJVM = false job.start() } futureResult.onComplete { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 1814a8d8e..ccd5c8f3f 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -34,6 +34,8 @@ trait FlintJobExecutor { var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory() var envinromentProvider: EnvironmentProvider = new RealEnvironment() var enableHiveSupport: Boolean = true + // termiante JVM in the presence non-deamon thread before exiting + var terminateJVM = true // The enabled setting, which can be applied only to the top-level mapping definition and to object fields, val resultIndexMapping = diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 78314a68b..76e5f692c 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -57,8 +57,6 @@ object FlintREPL extends Logging with FlintJobExecutor { val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L @volatile var earlyExitFlag: Boolean = false - // termiante JVM in the presence non-deamon thread before exiting - var terminateJVM = true def updateSessionIndex(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = { updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand)) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index bbaceb15d..4fb272938 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -19,7 +19,7 @@ import org.opensearch.flint.core.storage.OpenSearchUpdater import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.FlintJob.createSparkSession -import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, updateFlintInstanceBeforeShutdown} +import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, threadPoolFactory, updateFlintInstanceBeforeShutdown} import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.util.ThreadUtils @@ -106,6 +106,18 @@ case class JobOperator( case e: Exception => logError("Fail to close threadpool", e) } recordStreamingCompletionStatus(exceptionThrown) + + // Check for non-daemon threads that may prevent the driver from shutting down. + // Non-daemon threads other than the main thread indicate that the driver is still processing tasks, + // which may be due to unresolved bugs in dependencies or threads not being properly shut down. + if (terminateJVM && threadPoolFactory.hasNonDaemonThreadsOtherThanMain) { + logInfo("A non-daemon thread in the driver is seen.") + // Exit the JVM to prevent resource leaks and potential emr-s job hung. + // A zero status code is used for a graceful shutdown without indicating an error. + // If exiting with non-zero status, emr-s job will fail. + // This is a part of the fault tolerance mechanism to handle such scenarios gracefully + System.exit(0) + } } def stop(): Unit = { From 94fc2f55d637ed360eef01df79b9facb40161da2 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 22 Mar 2024 09:09:26 -0700 Subject: [PATCH 3/4] Rule out logical deleted skipping index in query rewrite (#289) * Ignore logical deleted skipping index Signed-off-by: Chen Dai * Add IT Signed-off-by: Chen Dai * Rename skipping index check method Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- .../ApplyFlintSparkSkippingIndex.scala | 14 +++++--- .../ApplyFlintSparkSkippingIndexSuite.scala | 32 +++++++++++++++---- .../FlintSparkSkippingIndexITSuite.scala | 22 +++++++++++++ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala index 83f8def4d..51d6cc802 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndex.scala @@ -6,7 +6,8 @@ package org.opensearch.flint.spark.skipping import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable -import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED +import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.apache.spark.sql.Column @@ -17,7 +18,6 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.qualifyTableName /** @@ -40,7 +40,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] false)) if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] => val index = flint.describeIndex(getIndexName(table)) - if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { + if (isActiveSkippingIndex(index)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) @@ -69,7 +69,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] // Check if query plan already rewritten table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() => val index = flint.describeIndex(getIndexName(catalog, identifier)) - if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) { + if (isActiveSkippingIndex(index)) { val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex] val indexFilter = rewriteToIndexFilter(skippingIndex, condition) /* @@ -123,6 +123,12 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan] }.isEmpty } + private def isActiveSkippingIndex(index: Option[FlintSparkIndex]): Boolean = { + index.isDefined && + index.get.kind == SKIPPING_INDEX_TYPE && + index.get.latestLogEntry.exists(_.state != DELETED) + } + private def rewriteToIndexFilter( index: FlintSparkSkippingIndex, condition: Expression): Option[Expression] = { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala index f9455fbfa..38c91cf4a 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/skipping/ApplyFlintSparkSkippingIndexSuite.scala @@ -8,6 +8,8 @@ package org.opensearch.flint.spark.skipping import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind @@ -58,7 +60,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { assertFlintQueryRewriter() .withSourceTable(testTable, testSchema) .withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, "name", "age") + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldNotRewrite() } @@ -69,7 +71,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { And( Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, "name", "age") + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldNotRewrite() } @@ -77,15 +79,23 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { assertFlintQueryRewriter() .withSourceTable(testTable, testSchema) .withFilter(EqualTo(nameCol, Literal("hello"))) - .withSkippingIndex(testIndex, "name") + .withSkippingIndex(testIndex, REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } + test("should not rewrite query with deleted skipping index") { + assertFlintQueryRewriter() + .withSourceTable(testTable, testSchema) + .withFilter(EqualTo(nameCol, Literal("hello"))) + .withSkippingIndex(testIndex, DELETED, "name") + .shouldNotRewrite() + } + test("should only push down filter condition with indexed column") { assertFlintQueryRewriter() .withSourceTable(testTable, testSchema) .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, "name") + .withSkippingIndex(testIndex, REFRESHING, "name") .shouldPushDownAfterRewrite(col("name") === "hello") } @@ -93,7 +103,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { assertFlintQueryRewriter() .withSourceTable(testTable, testSchema) .withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30)))) - .withSkippingIndex(testIndex, "name", "age") + .withSkippingIndex(testIndex, REFRESHING, "name", "age") .shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30) assertFlintQueryRewriter() @@ -102,7 +112,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { And( EqualTo(nameCol, Literal("hello")), And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle"))))) - .withSkippingIndex(testIndex, "name", "age", "address") + .withSkippingIndex(testIndex, REFRESHING, "name", "age", "address") .shouldPushDownAfterRewrite( col("name") === "hello" && col("age") === 30 && col("address") === "Seattle") } @@ -139,12 +149,20 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers { this } - def withSkippingIndex(indexName: String, indexCols: String*): AssertionHelper = { + def withSkippingIndex( + indexName: String, + indexState: IndexState, + indexCols: String*): AssertionHelper = { val skippingIndex = mock[FlintSparkSkippingIndex] when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE) when(skippingIndex.name()).thenReturn(indexName) when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy)) + // Mock index log entry with the given state + val logEntry = mock[FlintMetadataLogEntry] + when(logEntry.state).thenReturn(indexState) + when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry)) + when(flint.describeIndex(any())).thenReturn(Some(skippingIndex)) this } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index a4e7cfa79..b663b19bd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -308,6 +308,28 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("should not rewrite original query if skipping index is logically deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + flint.deleteIndex(testIndex) + + val query = + s""" + | SELECT name + | FROM $testTable + | WHERE year = 2023 AND month = 4 + |""".stripMargin + + val actual = sql(query).queryExecution.optimizedPlan + withFlintOptimizerDisabled { + val expect = sql(query).queryExecution.optimizedPlan + actual shouldBe expect + } + } + test("can build partition skipping index and rewrite applicable query") { flint .skippingIndex() From e87d33026eb7e3dac1de37247a4552020555bcf3 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 27 Mar 2024 08:54:46 -0700 Subject: [PATCH 4/4] Ignore non-Flint index in show and describe index statement (#296) * Ignore index if create failure and add IT Signed-off-by: Chen Dai * Fix style check Signed-off-by: Chen Dai * Add more IT Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 5 +-- .../flint/spark/FlintSparkIndexFactory.scala | 18 +++++++-- .../spark/FlintSparkIndexSqlITSuite.scala | 38 +++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 89e8dc423..acb416bfe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -178,7 +178,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient .getAllIndexMetadata(indexNamePattern) .asScala - .map(FlintSparkIndexFactory.create) + .flatMap(FlintSparkIndexFactory.create) } else { Seq.empty } @@ -196,8 +196,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Describing index name $indexName") if (flintClient.exists(indexName)) { val metadata = flintClient.getIndexMetadata(indexName) - val index = FlintSparkIndexFactory.create(metadata) - Some(index) + FlintSparkIndexFactory.create(metadata) } else { Option.empty } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 847b06984..aa3c23360 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -23,11 +23,13 @@ import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy +import org.apache.spark.internal.Logging + /** * Flint Spark index factory that encapsulates specific Flint index instance creation. This is for * internal code use instead of user facing API. */ -object FlintSparkIndexFactory { +object FlintSparkIndexFactory extends Logging { /** * Creates Flint index from generic Flint metadata. @@ -35,9 +37,19 @@ object FlintSparkIndexFactory { * @param metadata * Flint metadata * @return - * Flint index + * Flint index instance, or None if any error during creation */ - def create(metadata: FlintMetadata): FlintSparkIndex = { + def create(metadata: FlintMetadata): Option[FlintSparkIndex] = { + try { + Some(doCreate(metadata)) + } catch { + case e: Exception => + logWarning(s"Failed to create Flint index from metadata $metadata", e) + None + } + } + + private def doCreate(metadata: FlintMetadata): FlintSparkIndex = { val indexOptions = FlintSparkIndexOptions( metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) val latestLogEntry = metadata.latestLogEntry diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index ed4e18398..61a16779a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -5,6 +5,9 @@ package org.opensearch.flint.spark +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.CreateIndexRequest +import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView @@ -123,4 +126,39 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite { "refreshing"))) deleteTestIndex(testCoveringFlintIndex) } + + test("should ignore non-Flint index") { + try { + sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)") + + // Create a non-Flint index which has "flint_" prefix in coincidence + openSearchClient + .indices() + .create( + new CreateIndexRequest("flint_spark_catalog_invalid_index1"), + RequestOptions.DEFAULT) + + // Create a non-Flint index which has "flint_" prefix and _meta mapping in coincidence + openSearchClient + .indices() + .create( + new CreateIndexRequest("flint_spark_catalog_invalid_index2") + .mapping( + """{ + | "_meta": { + | "custom": "test" + | } + |} + |""".stripMargin, + XContentType.JSON), + RequestOptions.DEFAULT) + + // Show statement should ignore such index without problem + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog"), + Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active")) + } finally { + deleteTestIndex(testSkippingFlintIndex) + } + } }