diff --git a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala index 7ddf6604b..49dc8e355 100644 --- a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala +++ b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala @@ -7,6 +7,14 @@ package org.apache.spark.sql import org.opensearch.flint.common.model.FlintStatement +/** + * Trait for writing the result of a query execution to an external data storage. + */ trait QueryResultWriter { + + /** + * Writes the given DataFrame, which represents the result of a query execution, to an external + * data storage based on the provided FlintStatement metadata. + */ def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit } diff --git a/flint-commons/src/main/scala/org/apache/spark/sql/StatementsExecutionManager.scala b/flint-commons/src/main/scala/org/apache/spark/sql/StatementExecutionManager.scala similarity index 72% rename from flint-commons/src/main/scala/org/apache/spark/sql/StatementsExecutionManager.scala rename to flint-commons/src/main/scala/org/apache/spark/sql/StatementExecutionManager.scala index ae9c4bce4..acf28c572 100644 --- a/flint-commons/src/main/scala/org/apache/spark/sql/StatementsExecutionManager.scala +++ b/flint-commons/src/main/scala/org/apache/spark/sql/StatementExecutionManager.scala @@ -8,10 +8,12 @@ package org.apache.spark.sql import org.opensearch.flint.common.model.FlintStatement /** - * Trait defining the interface for managing FlintStatements executing in a micro-batch within - * same session. + * Trait defining the interface for managing FlintStatement execution. For example, in FlintREPL, + * multiple FlintStatements are running in a micro-batch within same session. + * + * This interface can also apply to other spark entry point like FlintJob. */ -trait StatementsExecutionManager { +trait StatementExecutionManager { /** * Prepares execution of each individual statement diff --git a/flint-commons/src/main/scala/org/opensearch/flint/common/model/FlintStatement.scala b/flint-commons/src/main/scala/org/opensearch/flint/common/model/FlintStatement.scala index bc8b38d9a..00876d46e 100644 --- a/flint-commons/src/main/scala/org/opensearch/flint/common/model/FlintStatement.scala +++ b/flint-commons/src/main/scala/org/opensearch/flint/common/model/FlintStatement.scala @@ -65,7 +65,7 @@ class FlintStatement( // Does not include context, which could contain sensitive information. override def toString: String = - s"FlintStatement(state=$state, query=$query, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)" + s"FlintStatement(state=$state, statementId=$statementId, queryId=$queryId, submitTime=$submitTime, error=$error)" } object FlintStatement { diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index 004c1784f..0f80d07c9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -27,15 +27,11 @@ import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.http.RetryableHttpAsyncClient; -import java.util.logging.Logger; - /** * Utility functions to create {@link IRestHighLevelClient}. */ public class OpenSearchClientUtils { - private static final Logger LOG = Logger.getLogger(OpenSearchClientUtils.class.getName()); - /** * Metadata log index name prefix diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index c5f178c56..1440db1f3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -17,14 +17,11 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.logging.Logger; /** * Abstract OpenSearch Reader. */ public abstract class OpenSearchReader implements FlintReader { - private static final Logger LOG = Logger.getLogger(OpenSearchReader.class.getName()); - @VisibleForTesting /** Search request source builder. */ public final SearchRequest searchRequest; @@ -50,7 +47,6 @@ public OpenSearchReader(IRestHighLevelClient client, SearchRequest searchRequest return false; } List searchHits = Arrays.asList(response.get().getHits().getHits()); - LOG.info("Result sets: " + searchHits.size()); iterator = searchHits.iterator(); } return iterator.hasNext(); diff --git a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala index cde3230d4..35c700aca 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -16,13 +16,12 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.testcontainers.OpenSearchContainer import org.scalatest.{BeforeAndAfterAll, Suite} -import org.apache.spark.internal.Logging import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, IGNORE_DOC_ID_COLUMN, REFRESH_POLICY} /** * Test required OpenSearch domain should extend OpenSearchSuite. */ -trait OpenSearchSuite extends BeforeAndAfterAll with Logging { +trait OpenSearchSuite extends BeforeAndAfterAll { self: Suite => protected lazy val container = new OpenSearchContainer() @@ -146,7 +145,6 @@ trait OpenSearchSuite extends BeforeAndAfterAll with Logging { val response = openSearchClient.bulk(request, RequestOptions.DEFAULT) - logInfo(response.toString) assume( !response.hasFailures, s"bulk index docs to $index failed: ${response.buildFailureMessage()}") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/CommandContext.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/CommandContext.scala index c0bcf3211..dcc486922 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/CommandContext.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/CommandContext.scala @@ -16,7 +16,7 @@ case class CommandContext( val sessionId: String, val sessionManager: SessionManager, val jobId: String, - var statementsExecutionManager: StatementsExecutionManager, + var statementsExecutionManager: StatementExecutionManager, val queryResultWriter: QueryResultWriter, val queryExecutionTimeout: Duration, val inactivityLimitMillis: Long, diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 49e45b316..5cdc805dd 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -64,10 +64,10 @@ object FlintREPL extends Logging with FlintJobExecutor { // init SparkContext val conf: SparkConf = createSparkConf() - val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "unknown") + val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "") - if (dataSource == "unknown") { - logInfo(FlintSparkConf.DATA_SOURCE_NAME.key + " is not set") + if (dataSource.trim.isEmpty) { + logAndThrow(FlintSparkConf.DATA_SOURCE_NAME.key + " is not set or is empty") } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* @@ -323,7 +323,7 @@ object FlintREPL extends Logging with FlintJobExecutor { .currentEpochMillis() - lastActivityTime <= commandContext.inactivityLimitMillis && canPickUpNextStatement) { logInfo(s"""Executing session with sessionId: ${sessionId}""") val statementsExecutionManager = - instantiateStatementsExecutionManager( + instantiateStatementExecutionManager( spark, sessionId, dataSource, @@ -514,7 +514,6 @@ object FlintREPL extends Logging with FlintJobExecutor { statementsExecutionManager.getNextStatement() match { case Some(flintStatement) => flintStatement.running() - logDebug(s"command running: $flintStatement") statementsExecutionManager.updateStatement(flintStatement) statementRunningCount.incrementAndGet() @@ -606,7 +605,7 @@ object FlintREPL extends Logging with FlintJobExecutor { def executeAndHandle( spark: SparkSession, flintStatement: FlintStatement, - statementsExecutionManager: StatementsExecutionManager, + statementsExecutionManager: StatementExecutionManager, dataSource: String, sessionId: String, executionContext: ExecutionContextExecutor, @@ -618,7 +617,7 @@ object FlintREPL extends Logging with FlintJobExecutor { executeQueryAsync( spark, flintStatement, - statementsExecutionManager: StatementsExecutionManager, + statementsExecutionManager: StatementExecutionManager, dataSource, sessionId, executionContext, @@ -734,7 +733,7 @@ object FlintREPL extends Logging with FlintJobExecutor { def executeQueryAsync( spark: SparkSession, flintStatement: FlintStatement, - statementsExecutionManager: StatementsExecutionManager, + statementsExecutionManager: StatementExecutionManager, dataSource: String, sessionId: String, executionContext: ExecutionContextExecutor, @@ -919,12 +918,13 @@ object FlintREPL extends Logging with FlintJobExecutor { result.getOrElse(throw new RuntimeException("Failed after retries")) } - private def getSessionId(conf: SparkConf): String = { - val sessionIdOption: Option[String] = Option(conf.get(FlintSparkConf.SESSION_ID.key, null)) - if (sessionIdOption.isEmpty) { - logAndThrow(FlintSparkConf.SESSION_ID.key + " is not set") + def getSessionId(conf: SparkConf): String = { + conf.getOption(FlintSparkConf.SESSION_ID.key) match { + case Some(sessionId) if sessionId.nonEmpty => + sessionId + case _ => + logAndThrow(s"${FlintSparkConf.SESSION_ID.key} is not set or is empty") } - sessionIdOption.get } private def instantiate[T](defaultConstructor: => T, className: String, args: Any*): T = { @@ -956,13 +956,13 @@ object FlintREPL extends Logging with FlintJobExecutor { spark.sparkContext.getConf.get(FlintSparkConf.CUSTOM_SESSION_MANAGER.key, "")) } - private def instantiateStatementsExecutionManager( + private def instantiateStatementExecutionManager( spark: SparkSession, sessionId: String, dataSource: String, - context: Map[String, Any]): StatementsExecutionManager = { + context: Map[String, Any]): StatementExecutionManager = { instantiate( - new StatementsExecutionManagerImpl(spark, sessionId, dataSource, context), + new StatementExecutionManagerImpl(spark, sessionId, dataSource, context), spark.sparkContext.getConf.get(FlintSparkConf.CUSTOM_STATEMENT_MANAGER.key, ""), spark, sessionId) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index 999742e67..422cfc947 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -121,7 +121,6 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { case Success(response) => IRestHighLevelClient.recordOperationSuccess( MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX) - logInfo(response.toString) response case Failure(e: Exception) => IRestHighLevelClient.recordOperationFailure( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala index 8d07e91ae..238f8fa3d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala @@ -12,8 +12,8 @@ import org.apache.spark.sql.FlintJob.writeDataFrameToOpensearch class QueryResultWriterImpl(context: Map[String, Any]) extends QueryResultWriter with Logging { - val resultIndex = context("resultIndex").asInstanceOf[String] - val osClient = context("osClient").asInstanceOf[OSClient] + private val resultIndex = context("resultIndex").asInstanceOf[String] + private val osClient = context("osClient").asInstanceOf[OSClient] override def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit = { writeDataFrameToOpensearch(dataFrame, resultIndex, osClient) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala index 85a1be5f1..79bd63200 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala @@ -22,19 +22,19 @@ class SessionManagerImpl(spark: SparkSession, resultIndexOption: Option[String]) with FlintJobExecutor with Logging { - // we don't allow default value for sessionIndex, sessionId and datasource. Throw exception if key not found. - val sessionIndex: String = spark.conf.get(FlintSparkConf.REQUEST_INDEX.key, "") + if (resultIndexOption.isEmpty) { + logAndThrow("resultIndex is not set") + } + + // we don't allow default value for sessionIndex. Throw exception if key not found. + private val sessionIndex: String = spark.conf.get(FlintSparkConf.REQUEST_INDEX.key, "") if (sessionIndex.isEmpty) { logAndThrow(FlintSparkConf.REQUEST_INDEX.key + " is not set") } - if (resultIndexOption.isEmpty) { - logAndThrow("resultIndex is not set") - } - - val osClient = new OSClient(FlintSparkConf().flintOptions()) - val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex) + private val osClient = new OSClient(FlintSparkConf().flintOptions()) + private val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex) override def getSessionContext: Map[String, Any] = { Map( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/StatementsExecutionManagerImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/StatementExecutionManagerImpl.scala similarity index 65% rename from spark-sql-application/src/main/scala/org/apache/spark/sql/StatementsExecutionManagerImpl.scala rename to spark-sql-application/src/main/scala/org/apache/spark/sql/StatementExecutionManagerImpl.scala index e138e6067..dc8414a17 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/StatementsExecutionManagerImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/StatementExecutionManagerImpl.scala @@ -10,50 +10,29 @@ import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} import org.opensearch.search.sort.SortOrder import org.apache.spark.internal.Logging -import org.apache.spark.sql.FlintJob.{createResultIndex, isSuperset, resultIndexMapping} +import org.apache.spark.sql.FlintJob.{checkAndCreateIndex, createResultIndex, isSuperset, resultIndexMapping} import org.apache.spark.sql.FlintREPL.executeQuery -class StatementsExecutionManagerImpl( +class StatementExecutionManagerImpl( spark: SparkSession, sessionId: String, dataSource: String, context: Map[String, Any]) - extends StatementsExecutionManager + extends StatementExecutionManager with Logging { - val sessionIndex = context("sessionIndex").asInstanceOf[String] - val resultIndex = context("resultIndex").asInstanceOf[String] - val osClient = context("osClient").asInstanceOf[OSClient] - val flintSessionIndexUpdater = + private val sessionIndex = context("sessionIndex").asInstanceOf[String] + private val resultIndex = context("resultIndex").asInstanceOf[String] + private val osClient = context("osClient").asInstanceOf[OSClient] + private val flintSessionIndexUpdater = context("flintSessionIndexUpdater").asInstanceOf[OpenSearchUpdater] // Using one reader client within same session will cause concurrency issue. // To resolve this move the reader creation and getNextStatement method to mirco-batch level - val flintReader = createOpenSearchQueryReader() + private val flintReader = createOpenSearchQueryReader() override def prepareStatementExecution(): Either[String, Unit] = { - try { - val existingSchema = osClient.getIndexMetadata(resultIndex) - if (!isSuperset(existingSchema, resultIndexMapping)) { - Left(s"The mapping of $resultIndex is incorrect.") - } else { - Right(()) - } - } catch { - case e: IllegalStateException - if e.getCause != null && - e.getCause.getMessage.contains("index_not_found_exception") => - createResultIndex(osClient, resultIndex, resultIndexMapping) - case e: InterruptedException => - val error = s"Interrupted by the main thread: ${e.getMessage}" - Thread.currentThread().interrupt() // Preserve the interrupt status - logError(error, e) - Left(error) - case e: Exception => - val error = s"Failed to verify existing mapping: ${e.getMessage}" - logError(error, e) - Left(error) - } + checkAndCreateIndex(osClient, resultIndex) } override def updateStatement(statement: FlintStatement): Unit = { flintSessionIndexUpdater.update(statement.statementId, FlintStatement.serialize(statement)) @@ -65,9 +44,8 @@ class StatementsExecutionManagerImpl( override def getNextStatement(): Option[FlintStatement] = { if (flintReader.hasNext) { val rawStatement = flintReader.next() - logInfo(s"raw statement: $rawStatement") val flintStatement = FlintStatement.deserialize(rawStatement) - logInfo(s"statement: $flintStatement") + logInfo(s"Next statement to execute: $flintStatement") Some(flintStatement) } else { None @@ -114,7 +92,6 @@ class StatementsExecutionManagerImpl( | ] | } |}""".stripMargin - val flintReader = osClient.createQueryReader(sessionIndex, dsl, "submitTime", SortOrder.ASC) flintReader } diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index 10a85c62b..bba0e40e2 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.types.{LongType, NullType, StringType, StructField, import org.apache.spark.sql.util.{DefaultThreadPoolFactory, MockThreadPoolFactory, MockTimeProvider, RealTimeProvider, ShutdownHookManagerTrait} import org.apache.spark.util.ThreadUtils -@Ignore class FlintREPLTest extends SparkFunSuite with MockitoSugar @@ -50,38 +49,56 @@ class FlintREPLTest // By using a type alias and casting, I can bypass the type checking error. type AnyScheduledFuture = ScheduledFuture[_] - test( - "parseArgs with one argument should return None for query and the argument as resultIndex") { + test("parseArgs with no arguments should return (None, None)") { + val args = Array.empty[String] + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) + queryOption shouldBe None + resultIndexOption shouldBe None + } + + test("parseArgs with one argument should return None for query and Some for resultIndex") { val args = Array("resultIndexName") - val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) queryOption shouldBe None - resultIndex shouldBe "resultIndexName" + resultIndexOption shouldBe Some("resultIndexName") } - test( - "parseArgs with two arguments should return the first argument as query and the second as resultIndex") { + test("parseArgs with two arguments should return Some for both query and resultIndex") { val args = Array("SELECT * FROM table", "resultIndexName") - val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) queryOption shouldBe Some("SELECT * FROM table") - resultIndex shouldBe "resultIndexName" + resultIndexOption shouldBe Some("resultIndexName") } test( - "parseArgs with no arguments should throw IllegalArgumentException with specific message") { - val args = Array.empty[String] + "parseArgs with more than two arguments should throw IllegalArgumentException with specific message") { + val args = Array("arg1", "arg2", "arg3") val exception = intercept[IllegalArgumentException] { FlintREPL.parseArgs(args) } - exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + exception.getMessage shouldBe "Unsupported number of arguments. Expected no more than two arguments." } - test( - "parseArgs with more than two arguments should throw IllegalArgumentException with specific message") { - val args = Array("arg1", "arg2", "arg3") + test("getSessionId should throw exception when SESSION_ID is not set") { + val conf = new SparkConf() val exception = intercept[IllegalArgumentException] { - FlintREPL.parseArgs(args) + FlintREPL.getSessionId(conf) } - exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + assert(exception.getMessage === FlintSparkConf.SESSION_ID.key + " is not set or is empty") + } + + test("getSessionId should return the session ID when it's set") { + val sessionId = "test-session-id" + val conf = new SparkConf().set(FlintSparkConf.SESSION_ID.key, sessionId) + assert(FlintREPL.getSessionId(conf) === sessionId) + } + + test("getSessionId should throw exception when SESSION_ID is set to empty string") { + val conf = new SparkConf().set(FlintSparkConf.SESSION_ID.key, "") + val exception = intercept[IllegalArgumentException] { + FlintREPL.getSessionId(conf) + } + assert(exception.getMessage === FlintSparkConf.SESSION_ID.key + " is not set or is empty") } test("getQuery should return query from queryOption if present") { @@ -159,7 +176,7 @@ class FlintREPLTest } } - test("createHeartBeatUpdater should update heartbeat correctly") { + ignore("createHeartBeatUpdater should update heartbeat correctly") { // Mocks val threadPool = mock[ScheduledExecutorService] val scheduledFutureRaw = mock[ScheduledFuture[_]] @@ -321,7 +338,7 @@ class FlintREPLTest assert(!result) // The function should return false } - test("test canPickNextStatement: Doc Exists, JobId Matches, but JobId is Excluded") { + ignore("test canPickNextStatement: Doc Exists, JobId Matches, but JobId is Excluded") { val sessionId = "session123" val jobId = "jobABC" val osClient = mock[OSClient] @@ -545,7 +562,7 @@ class FlintREPLTest assert(!result) } - test("Doc Exists and excludeJobIds is an ArrayList Not Containing JobId") { + ignore("Doc Exists and excludeJobIds is an ArrayList Not Containing JobId") { val sessionId = "session123" val jobId = "jobABC" val osClient = mock[OSClient] @@ -609,7 +626,7 @@ class FlintREPLTest val spark = SparkSession.builder().master("local").appName("FlintREPLTest").getOrCreate() try { val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] val commandContext = CommandContext( @@ -639,14 +656,14 @@ class FlintREPLTest } } - test("executeAndHandle should handle TimeoutException properly") { + ignore("executeAndHandle should handle TimeoutException properly") { val mockSparkSession = mock[SparkSession] val mockConf = mock[RuntimeConfig] when(mockSparkSession.conf).thenReturn(mockConf) when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) - val mockStatementsExecutionManager = mock[StatementsExecutionManager] + val mockStatementsExecutionManager = mock[StatementExecutionManager] // val mockExecutionContextExecutor: ExecutionContextExecutor = mock[ExecutionContextExecutor] val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor("flint-repl", 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) @@ -695,13 +712,13 @@ class FlintREPLTest } finally threadPool.shutdown() } - test("executeAndHandle should handle ParseException properly") { + ignore("executeAndHandle should handle ParseException properly") { val mockSparkSession = mock[SparkSession] val mockConf = mock[RuntimeConfig] when(mockSparkSession.conf).thenReturn(mockConf) when(mockSparkSession.conf.get(FlintSparkConf.JOB_TYPE.key)) .thenReturn(FlintSparkConf.JOB_TYPE.defaultValue.get) - val mockStatementsExecutionManager = mock[StatementsExecutionManager] + val mockStatementsExecutionManager = mock[StatementExecutionManager] val flintStatement = new FlintStatement( @@ -795,7 +812,7 @@ class FlintREPLTest assert(!result) // Expecting false as the job should proceed normally } - test("setupFlintJobWithExclusionCheck should exit early if current job is excluded") { + ignore("setupFlintJobWithExclusionCheck should exit early if current job is excluded") { val osClient = mock[OSClient] val getResponse = mock[GetResponse] val applicationId = "app1" @@ -911,7 +928,7 @@ class FlintREPLTest assert(!result) // Expecting false as the job proceeds normally } - test( + ignore( "setupFlintJobWithExclusionCheck should throw NoSuchElementException if sessionIndex or sessionId is missing") { val osClient = mock[OSClient] val flintSessionIndexUpdater = mock[OpenSearchUpdater] @@ -933,7 +950,7 @@ class FlintREPLTest } } - test("queryLoop continue until inactivity limit is reached") { + ignore("queryLoop continue until inactivity limit is reached") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -952,7 +969,7 @@ class FlintREPLTest val spark = SparkSession.builder().master("local").appName("FlintREPLTest").getOrCreate() val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] val commandContext = CommandContext( @@ -984,7 +1001,7 @@ class FlintREPLTest spark.stop() } - test("queryLoop should stop when canPickUpNextStatement is false") { + ignore("queryLoop should stop when canPickUpNextStatement is false") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -992,7 +1009,7 @@ class FlintREPLTest when(mockReader.hasNext).thenReturn(true) val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] val resultIndex = "testResultIndex" @@ -1046,7 +1063,7 @@ class FlintREPLTest spark.stop() } - test("queryLoop should properly shut down the thread pool after execution") { + ignore("queryLoop should properly shut down the thread pool after execution") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -1060,7 +1077,7 @@ class FlintREPLTest val jobId = "testJobId" val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] when(statementLifecycleManager.getNextStatement()).thenReturn(None) @@ -1119,7 +1136,7 @@ class FlintREPLTest // Create a SparkSession for testing val spark = SparkSession.builder().master("local").appName("FlintREPLTest").getOrCreate() val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] val flintSessionIndexUpdater = mock[OpenSearchUpdater] @@ -1155,7 +1172,7 @@ class FlintREPLTest } } - test("queryLoop should correctly update loop control variables") { + ignore("queryLoop should correctly update loop control variables") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -1206,7 +1223,7 @@ class FlintREPLTest val flintSessionIndexUpdater = mock[OpenSearchUpdater] val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] val commandContext = CommandContext( @@ -1240,7 +1257,7 @@ class FlintREPLTest (100, 300L) // 100 ms, 300 ms ) - test( + ignore( "queryLoop should execute loop without processing any commands for different inactivity limits and frequencies") { forAll(testCases) { (inactivityLimit, queryLoopExecutionFrequency) => val mockReader = mock[FlintReader] @@ -1259,7 +1276,7 @@ class FlintREPLTest val jobId = "testJobId" val sessionManager = mock[SessionManager] - val statementLifecycleManager = mock[StatementsExecutionManager] + val statementLifecycleManager = mock[StatementExecutionManager] val queryResultWriter = mock[QueryResultWriter] // Create a SparkSession for testing