Skip to content

Commit

Permalink
Merge branch 'main' into alter-index
Browse files Browse the repository at this point in the history
  • Loading branch information
seankao-az committed Mar 27, 2024
2 parents 25cb0ff + e87d330 commit 1247eaf
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class FlintOptions implements Serializable {

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;
public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.getAllIndexMetadata(indexNamePattern)
.asScala
.map(FlintSparkIndexFactory.create)
.flatMap(FlintSparkIndexFactory.create)
} else {
Seq.empty
}
Expand All @@ -197,8 +197,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Describing index name $indexName")
if (flintClient.exists(indexName)) {
val metadata = flintClient.getIndexMetadata(indexName)
val index = FlintSparkIndexFactory.create(metadata)
Some(index)
FlintSparkIndexFactory.create(metadata)
} else {
Option.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,33 @@ import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.internal.Logging

/**
* Flint Spark index factory that encapsulates specific Flint index instance creation. This is for
* internal code use instead of user facing API.
*/
object FlintSparkIndexFactory {
object FlintSparkIndexFactory extends Logging {

/**
* Creates Flint index from generic Flint metadata.
*
* @param metadata
* Flint metadata
* @return
* Flint index
* Flint index instance, or None if any error during creation
*/
def create(metadata: FlintMetadata): FlintSparkIndex = {
def create(metadata: FlintMetadata): Option[FlintSparkIndex] = {
try {
Some(doCreate(metadata))
} catch {
case e: Exception =>
logWarning(s"Failed to create Flint index from metadata $metadata", e)
None
}
}

private def doCreate(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
package org.opensearch.flint.spark.skipping

import com.amazon.awslogsdataaccesslayer.connectors.spark.LogsTable
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}

import org.apache.spark.sql.Column
Expand All @@ -17,7 +18,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.qualifyTableName

/**
Expand All @@ -40,7 +40,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
false))
if hasNoDisjunction(condition) && !location.isInstanceOf[FlintSparkSkippingFileIndex] =>
val index = flint.describeIndex(getIndexName(table))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
if (isActiveSkippingIndex(index)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)

Expand Down Expand Up @@ -69,7 +69,7 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
// Check if query plan already rewritten
table.isInstanceOf[LogsTable] && !table.asInstanceOf[LogsTable].hasFileIndexScan() =>
val index = flint.describeIndex(getIndexName(catalog, identifier))
if (index.exists(_.kind == SKIPPING_INDEX_TYPE)) {
if (isActiveSkippingIndex(index)) {
val skippingIndex = index.get.asInstanceOf[FlintSparkSkippingIndex]
val indexFilter = rewriteToIndexFilter(skippingIndex, condition)
/*
Expand Down Expand Up @@ -123,6 +123,12 @@ class ApplyFlintSparkSkippingIndex(flint: FlintSpark) extends Rule[LogicalPlan]
}.isEmpty
}

private def isActiveSkippingIndex(index: Option[FlintSparkIndex]): Boolean = {
index.isDefined &&
index.get.kind == SKIPPING_INDEX_TYPE &&
index.get.latestLogEntry.exists(_.state != DELETED)
}

private def rewriteToIndexFilter(
index: FlintSparkSkippingIndex,
condition: Expression): Option[Expression] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package org.opensearch.flint.spark.skipping
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{DELETED, IndexState, REFRESHING}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.SkippingKind
Expand Down Expand Up @@ -58,7 +60,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldNotRewrite()
}

Expand All @@ -69,31 +71,39 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
And(
Or(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))),
EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldNotRewrite()
}

test("should rewrite query with skipping index") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(EqualTo(nameCol, Literal("hello")))
.withSkippingIndex(testIndex, "name")
.withSkippingIndex(testIndex, REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}

test("should not rewrite query with deleted skipping index") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(EqualTo(nameCol, Literal("hello")))
.withSkippingIndex(testIndex, DELETED, "name")
.shouldNotRewrite()
}

test("should only push down filter condition with indexed column") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name")
.withSkippingIndex(testIndex, REFRESHING, "name")
.shouldPushDownAfterRewrite(col("name") === "hello")
}

test("should push down all filter conditions with indexed column") {
assertFlintQueryRewriter()
.withSourceTable(testTable, testSchema)
.withFilter(And(EqualTo(nameCol, Literal("hello")), EqualTo(ageCol, Literal(30))))
.withSkippingIndex(testIndex, "name", "age")
.withSkippingIndex(testIndex, REFRESHING, "name", "age")
.shouldPushDownAfterRewrite(col("name") === "hello" && col("age") === 30)

assertFlintQueryRewriter()
Expand All @@ -102,7 +112,7 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
And(
EqualTo(nameCol, Literal("hello")),
And(EqualTo(ageCol, Literal(30)), EqualTo(addressCol, Literal("Seattle")))))
.withSkippingIndex(testIndex, "name", "age", "address")
.withSkippingIndex(testIndex, REFRESHING, "name", "age", "address")
.shouldPushDownAfterRewrite(
col("name") === "hello" && col("age") === 30 && col("address") === "Seattle")
}
Expand Down Expand Up @@ -139,12 +149,20 @@ class ApplyFlintSparkSkippingIndexSuite extends SparkFunSuite with Matchers {
this
}

def withSkippingIndex(indexName: String, indexCols: String*): AssertionHelper = {
def withSkippingIndex(
indexName: String,
indexState: IndexState,
indexCols: String*): AssertionHelper = {
val skippingIndex = mock[FlintSparkSkippingIndex]
when(skippingIndex.kind).thenReturn(SKIPPING_INDEX_TYPE)
when(skippingIndex.name()).thenReturn(indexName)
when(skippingIndex.indexedColumns).thenReturn(indexCols.map(FakeSkippingStrategy))

// Mock index log entry with the given state
val logEntry = mock[FlintMetadataLogEntry]
when(logEntry.state).thenReturn(indexState)
when(skippingIndex.latestLogEntry).thenReturn(Some(logEntry))

when(flint.describeIndex(any())).thenReturn(Some(skippingIndex))
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
JobOperator(spark, query, dataSourceName, resultIndex, true, streamingRunningCount)
job.envinromentProvider = new MockEnvironment(
Map("SERVERLESS_EMR_JOB_ID" -> jobRunId, "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" -> appId))

job.terminateJVM = false
job.start()
}
futureResult.onComplete {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.flint.spark

import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand Down Expand Up @@ -123,4 +126,39 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
"refreshing")))
deleteTestIndex(testCoveringFlintIndex)
}

test("should ignore non-Flint index") {
try {
sql(s"CREATE SKIPPING INDEX ON $testTableQualifiedName (name VALUE_SET)")

// Create a non-Flint index which has "flint_" prefix in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index1"),
RequestOptions.DEFAULT)

// Create a non-Flint index which has "flint_" prefix and _meta mapping in coincidence
openSearchClient
.indices()
.create(
new CreateIndexRequest("flint_spark_catalog_invalid_index2")
.mapping(
"""{
| "_meta": {
| "custom": "test"
| }
|}
|""".stripMargin,
XContentType.JSON),
RequestOptions.DEFAULT)

// Show statement should ignore such index without problem
checkAnswer(
sql(s"SHOW FLINT INDEX IN spark_catalog"),
Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active"))
} finally {
deleteTestIndex(testSkippingFlintIndex)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,28 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should not rewrite original query if skipping index is logically deleted") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()
flint.deleteIndex(testIndex)

val query =
s"""
| SELECT name
| FROM $testTable
| WHERE year = 2023 AND month = 4
|""".stripMargin

val actual = sql(query).queryExecution.optimizedPlan
withFlintOptimizerDisabled {
val expect = sql(query).queryExecution.optimizedPlan
actual shouldBe expect
}
}

test("can build partition skipping index and rewrite applicable query") {
flint
.skippingIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait FlintJobExecutor {
var threadPoolFactory: ThreadPoolFactory = new DefaultThreadPoolFactory()
var envinromentProvider: EnvironmentProvider = new RealEnvironment()
var enableHiveSupport: Boolean = true
// termiante JVM in the presence non-deamon thread before exiting
var terminateJVM = true

// The enabled setting, which can be applied only to the top-level mapping definition and to object fields,
val resultIndexMapping =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L

@volatile var earlyExitFlag: Boolean = false
// termiante JVM in the presence non-deamon thread before exiting
var terminateJVM = true

def updateSessionIndex(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = {
updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.opensearch.flint.core.storage.OpenSearchUpdater
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.FlintJob.createSparkSession
import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, updateFlintInstanceBeforeShutdown}
import org.apache.spark.sql.FlintREPL.{executeQuery, logInfo, threadPoolFactory, updateFlintInstanceBeforeShutdown}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -106,6 +106,18 @@ case class JobOperator(
case e: Exception => logError("Fail to close threadpool", e)
}
recordStreamingCompletionStatus(exceptionThrown)

// Check for non-daemon threads that may prevent the driver from shutting down.
// Non-daemon threads other than the main thread indicate that the driver is still processing tasks,
// which may be due to unresolved bugs in dependencies or threads not being properly shut down.
if (terminateJVM && threadPoolFactory.hasNonDaemonThreadsOtherThanMain) {
logInfo("A non-daemon thread in the driver is seen.")
// Exit the JVM to prevent resource leaks and potential emr-s job hung.
// A zero status code is used for a graceful shutdown without indicating an error.
// If exiting with non-zero status, emr-s job will fail.
// This is a part of the fault tolerance mechanism to handle such scenarios gracefully
System.exit(0)
}
}

def stop(): Unit = {
Expand Down

0 comments on commit 1247eaf

Please sign in to comment.