From 25f674f7d0186e45cc29d8104dbf0b032d56e31a Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 16 Aug 2024 15:02:40 -0700 Subject: [PATCH] Clean up logs and add UTs Signed-off-by: Louis Chu --- .../apache/spark/sql/QueryResultWriter.scala | 8 ++ .../core/storage/OpenSearchClientUtils.java | 4 - .../flint/core/storage/OpenSearchReader.java | 4 - .../opensearch/flint/OpenSearchSuite.scala | 4 +- .../org/apache/spark/sql/FlintREPL.scala | 17 +++-- .../scala/org/apache/spark/sql/OSClient.scala | 1 - .../apache/spark/sql/SessionManagerImpl.scala | 10 +-- .../org/apache/spark/sql/FlintREPLTest.scala | 75 ++++++++++++------- 8 files changed, 69 insertions(+), 54 deletions(-) diff --git a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala index 7ddf6604b..49dc8e355 100644 --- a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala +++ b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala @@ -7,6 +7,14 @@ package org.apache.spark.sql import org.opensearch.flint.common.model.FlintStatement +/** + * Trait for writing the result of a query execution to an external data storage. + */ trait QueryResultWriter { + + /** + * Writes the given DataFrame, which represents the result of a query execution, to an external + * data storage based on the provided FlintStatement metadata. + */ def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java index 004c1784f..0f80d07c9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchClientUtils.java @@ -27,15 +27,11 @@ import org.opensearch.flint.core.RestHighLevelClientWrapper; import org.opensearch.flint.core.auth.ResourceBasedAWSRequestSigningApacheInterceptor; import org.opensearch.flint.core.http.RetryableHttpAsyncClient; -import java.util.logging.Logger; - /** * Utility functions to create {@link IRestHighLevelClient}. */ public class OpenSearchClientUtils { - private static final Logger LOG = Logger.getLogger(OpenSearchClientUtils.class.getName()); - /** * Metadata log index name prefix diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java index c5f178c56..1440db1f3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchReader.java @@ -17,14 +17,11 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.logging.Logger; /** * Abstract OpenSearch Reader. */ public abstract class OpenSearchReader implements FlintReader { - private static final Logger LOG = Logger.getLogger(OpenSearchReader.class.getName()); - @VisibleForTesting /** Search request source builder. */ public final SearchRequest searchRequest; @@ -50,7 +47,6 @@ public OpenSearchReader(IRestHighLevelClient client, SearchRequest searchRequest return false; } List searchHits = Arrays.asList(response.get().getHits().getHits()); - LOG.info("Result sets: " + searchHits.size()); iterator = searchHits.iterator(); } return iterator.hasNext(); diff --git a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala index cde3230d4..35c700aca 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/OpenSearchSuite.scala @@ -16,13 +16,12 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.testcontainers.OpenSearchContainer import org.scalatest.{BeforeAndAfterAll, Suite} -import org.apache.spark.internal.Logging import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, IGNORE_DOC_ID_COLUMN, REFRESH_POLICY} /** * Test required OpenSearch domain should extend OpenSearchSuite. */ -trait OpenSearchSuite extends BeforeAndAfterAll with Logging { +trait OpenSearchSuite extends BeforeAndAfterAll { self: Suite => protected lazy val container = new OpenSearchContainer() @@ -146,7 +145,6 @@ trait OpenSearchSuite extends BeforeAndAfterAll with Logging { val response = openSearchClient.bulk(request, RequestOptions.DEFAULT) - logInfo(response.toString) assume( !response.hasFailures, s"bulk index docs to $index failed: ${response.buildFailureMessage()}") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 49e45b316..565a89b2a 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -64,10 +64,10 @@ object FlintREPL extends Logging with FlintJobExecutor { // init SparkContext val conf: SparkConf = createSparkConf() - val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "unknown") + val dataSource = conf.get(FlintSparkConf.DATA_SOURCE_NAME.key, "") - if (dataSource == "unknown") { - logInfo(FlintSparkConf.DATA_SOURCE_NAME.key + " is not set") + if (dataSource.trim.isEmpty) { + logAndThrow(FlintSparkConf.DATA_SOURCE_NAME.key + " is not set or is empty") } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* @@ -919,12 +919,13 @@ object FlintREPL extends Logging with FlintJobExecutor { result.getOrElse(throw new RuntimeException("Failed after retries")) } - private def getSessionId(conf: SparkConf): String = { - val sessionIdOption: Option[String] = Option(conf.get(FlintSparkConf.SESSION_ID.key, null)) - if (sessionIdOption.isEmpty) { - logAndThrow(FlintSparkConf.SESSION_ID.key + " is not set") + def getSessionId(conf: SparkConf): String = { + conf.getOption(FlintSparkConf.SESSION_ID.key) match { + case Some(sessionId) if sessionId.nonEmpty => + sessionId + case _ => + logAndThrow(s"${FlintSparkConf.SESSION_ID.key} is not set or is empty") } - sessionIdOption.get } private def instantiate[T](defaultConstructor: => T, className: String, args: Any*): T = { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index 999742e67..422cfc947 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -121,7 +121,6 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { case Success(response) => IRestHighLevelClient.recordOperationSuccess( MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX) - logInfo(response.toString) response case Failure(e: Exception) => IRestHighLevelClient.recordOperationFailure( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala index 85a1be5f1..9b73cbc3a 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/SessionManagerImpl.scala @@ -22,17 +22,17 @@ class SessionManagerImpl(spark: SparkSession, resultIndexOption: Option[String]) with FlintJobExecutor with Logging { - // we don't allow default value for sessionIndex, sessionId and datasource. Throw exception if key not found. + if (resultIndexOption.isEmpty) { + logAndThrow("resultIndex is not set") + } + + // we don't allow default value for sessionIndex. Throw exception if key not found. val sessionIndex: String = spark.conf.get(FlintSparkConf.REQUEST_INDEX.key, "") if (sessionIndex.isEmpty) { logAndThrow(FlintSparkConf.REQUEST_INDEX.key + " is not set") } - if (resultIndexOption.isEmpty) { - logAndThrow("resultIndex is not set") - } - val osClient = new OSClient(FlintSparkConf().flintOptions()) val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex) diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala index 10a85c62b..b786a56a0 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintREPLTest.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.types.{LongType, NullType, StringType, StructField, import org.apache.spark.sql.util.{DefaultThreadPoolFactory, MockThreadPoolFactory, MockTimeProvider, RealTimeProvider, ShutdownHookManagerTrait} import org.apache.spark.util.ThreadUtils -@Ignore class FlintREPLTest extends SparkFunSuite with MockitoSugar @@ -50,38 +49,56 @@ class FlintREPLTest // By using a type alias and casting, I can bypass the type checking error. type AnyScheduledFuture = ScheduledFuture[_] - test( - "parseArgs with one argument should return None for query and the argument as resultIndex") { + test("parseArgs with no arguments should return (None, None)") { + val args = Array.empty[String] + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) + queryOption shouldBe None + resultIndexOption shouldBe None + } + + test("parseArgs with one argument should return None for query and Some for resultIndex") { val args = Array("resultIndexName") - val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) queryOption shouldBe None - resultIndex shouldBe "resultIndexName" + resultIndexOption shouldBe Some("resultIndexName") } - test( - "parseArgs with two arguments should return the first argument as query and the second as resultIndex") { + test("parseArgs with two arguments should return Some for both query and resultIndex") { val args = Array("SELECT * FROM table", "resultIndexName") - val (queryOption, resultIndex) = FlintREPL.parseArgs(args) + val (queryOption, resultIndexOption) = FlintREPL.parseArgs(args) queryOption shouldBe Some("SELECT * FROM table") - resultIndex shouldBe "resultIndexName" + resultIndexOption shouldBe Some("resultIndexName") } test( - "parseArgs with no arguments should throw IllegalArgumentException with specific message") { - val args = Array.empty[String] + "parseArgs with more than two arguments should throw IllegalArgumentException with specific message") { + val args = Array("arg1", "arg2", "arg3") val exception = intercept[IllegalArgumentException] { FlintREPL.parseArgs(args) } - exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + exception.getMessage shouldBe "Unsupported number of arguments. Expected no more than two arguments." } - test( - "parseArgs with more than two arguments should throw IllegalArgumentException with specific message") { - val args = Array("arg1", "arg2", "arg3") + test("getSessionId should throw exception when SESSION_ID is not set") { + val conf = new SparkConf() val exception = intercept[IllegalArgumentException] { - FlintREPL.parseArgs(args) + FlintREPL.getSessionId(conf) } - exception.getMessage shouldBe "Unsupported number of arguments. Expected 1 or 2 arguments." + assert(exception.getMessage === FlintSparkConf.SESSION_ID.key + " is not set or is empty") + } + + test("getSessionId should return the session ID when it's set") { + val sessionId = "test-session-id" + val conf = new SparkConf().set(FlintSparkConf.SESSION_ID.key, sessionId) + assert(FlintREPL.getSessionId(conf) === sessionId) + } + + test("getSessionId should throw exception when SESSION_ID is set to empty string") { + val conf = new SparkConf().set(FlintSparkConf.SESSION_ID.key, "") + val exception = intercept[IllegalArgumentException] { + FlintREPL.getSessionId(conf) + } + assert(exception.getMessage === FlintSparkConf.SESSION_ID.key + " is not set or is empty") } test("getQuery should return query from queryOption if present") { @@ -159,7 +176,7 @@ class FlintREPLTest } } - test("createHeartBeatUpdater should update heartbeat correctly") { + ignore("createHeartBeatUpdater should update heartbeat correctly") { // Mocks val threadPool = mock[ScheduledExecutorService] val scheduledFutureRaw = mock[ScheduledFuture[_]] @@ -321,7 +338,7 @@ class FlintREPLTest assert(!result) // The function should return false } - test("test canPickNextStatement: Doc Exists, JobId Matches, but JobId is Excluded") { + ignore("test canPickNextStatement: Doc Exists, JobId Matches, but JobId is Excluded") { val sessionId = "session123" val jobId = "jobABC" val osClient = mock[OSClient] @@ -545,7 +562,7 @@ class FlintREPLTest assert(!result) } - test("Doc Exists and excludeJobIds is an ArrayList Not Containing JobId") { + ignore("Doc Exists and excludeJobIds is an ArrayList Not Containing JobId") { val sessionId = "session123" val jobId = "jobABC" val osClient = mock[OSClient] @@ -639,7 +656,7 @@ class FlintREPLTest } } - test("executeAndHandle should handle TimeoutException properly") { + ignore("executeAndHandle should handle TimeoutException properly") { val mockSparkSession = mock[SparkSession] val mockConf = mock[RuntimeConfig] when(mockSparkSession.conf).thenReturn(mockConf) @@ -695,7 +712,7 @@ class FlintREPLTest } finally threadPool.shutdown() } - test("executeAndHandle should handle ParseException properly") { + ignore("executeAndHandle should handle ParseException properly") { val mockSparkSession = mock[SparkSession] val mockConf = mock[RuntimeConfig] when(mockSparkSession.conf).thenReturn(mockConf) @@ -795,7 +812,7 @@ class FlintREPLTest assert(!result) // Expecting false as the job should proceed normally } - test("setupFlintJobWithExclusionCheck should exit early if current job is excluded") { + ignore("setupFlintJobWithExclusionCheck should exit early if current job is excluded") { val osClient = mock[OSClient] val getResponse = mock[GetResponse] val applicationId = "app1" @@ -911,7 +928,7 @@ class FlintREPLTest assert(!result) // Expecting false as the job proceeds normally } - test( + ignore( "setupFlintJobWithExclusionCheck should throw NoSuchElementException if sessionIndex or sessionId is missing") { val osClient = mock[OSClient] val flintSessionIndexUpdater = mock[OpenSearchUpdater] @@ -933,7 +950,7 @@ class FlintREPLTest } } - test("queryLoop continue until inactivity limit is reached") { + ignore("queryLoop continue until inactivity limit is reached") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -984,7 +1001,7 @@ class FlintREPLTest spark.stop() } - test("queryLoop should stop when canPickUpNextStatement is false") { + ignore("queryLoop should stop when canPickUpNextStatement is false") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -1046,7 +1063,7 @@ class FlintREPLTest spark.stop() } - test("queryLoop should properly shut down the thread pool after execution") { + ignore("queryLoop should properly shut down the thread pool after execution") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -1155,7 +1172,7 @@ class FlintREPLTest } } - test("queryLoop should correctly update loop control variables") { + ignore("queryLoop should correctly update loop control variables") { val mockReader = mock[FlintReader] val osClient = mock[OSClient] when(osClient.createQueryReader(any[String], any[String], any[String], eqTo(SortOrder.ASC))) @@ -1240,7 +1257,7 @@ class FlintREPLTest (100, 300L) // 100 ms, 300 ms ) - test( + ignore( "queryLoop should execute loop without processing any commands for different inactivity limits and frequencies") { forAll(testCases) { (inactivityLimit, queryLoopExecutionFrequency) => val mockReader = mock[FlintReader]