From 19ac42b63c395ca7bb4e88c762450a0c612feeb9 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Sat, 28 Sep 2024 16:47:24 -0700 Subject: [PATCH] Add more scala doc and update sbt Signed-off-by: Louis Chu --- build.sbt | 2 +- .../apache/spark/sql/QueryResultWriter.scala | 31 ++++++++++++++++--- .../org/apache/spark/sql/FlintREPL.scala | 7 ++++- .../spark/sql/QueryResultWriterImpl.scala | 9 +++--- 4 files changed, 39 insertions(+), 10 deletions(-) diff --git a/build.sbt b/build.sbt index b882d133b..392b87317 100644 --- a/build.sbt +++ b/build.sbt @@ -88,7 +88,7 @@ lazy val flintCore = (project in file("flint-core")) exclude ("com.fasterxml.jackson.core", "jackson-databind"), "com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "software.amazon.awssdk" % "auth-crt" % "2.25.23", + "software.amazon.awssdk" % "auth-crt" % "2.28.10" % "provided", "org.scalactic" %% "scalactic" % "3.2.15" % "test", "org.scalatest" %% "scalatest" % "3.2.15" % "test", "org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test", diff --git a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala index bc76547f6..efb001785 100644 --- a/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala +++ b/flint-commons/src/main/scala/org/apache/spark/sql/QueryResultWriter.scala @@ -13,15 +13,38 @@ import org.opensearch.flint.common.model.FlintStatement trait QueryResultWriter { /** - * Writes the given DataFrame, which represents the result of a query execution, to an external - * data storage based on the provided FlintStatement metadata. + * Writes the given DataFrame to an external data storage based on the FlintStatement metadata. + * This method is responsible for persisting the query results. + * + * Note: This method typically involves I/O operations and may trigger Spark actions to + * materialize the DataFrame if it hasn't been processed yet. + * + * @param dataFrame + * The DataFrame containing the query results to be written. + * @param flintStatement + * The FlintStatement containing metadata that guides the writing process. */ def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit /** - * Reformat the given DataFrame to the desired format. + * Defines transformations on the given DataFrame and triggers an action to process it. This + * method applies necessary transformations based on the FlintStatement metadata and executes an + * action to compute the result. + * + * Note: Calling this method will trigger the actual data processing in Spark. If the Spark SQL + * thread is waiting for the result of a query, termination on the same thread will be blocked + * until the action completes. + * + * @param dataFrame + * The DataFrame to be processed. + * @param flintStatement + * The FlintStatement containing statement metadata. + * @param queryStartTime + * The start time of the query execution. + * @return + * The processed DataFrame after applying transformations and executing an action. */ - def reformatDataFrame( + def processDataFrame( dataFrame: DataFrame, flintStatement: FlintStatement, queryStartTime: Long): DataFrame diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index a57f8127d..cdeebe663 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -810,8 +810,13 @@ object FlintREPL extends Logging with FlintJobExecutor { } else { val futureQueryExecution = Future { val startTime = System.currentTimeMillis() + // Execute the statement and get the resulting DataFrame + // This step may involve Spark transformations, but not necessarily actions val df = statementsExecutionManager.executeStatement(flintStatement) - queryResultWriter.reformatDataFrame(df, flintStatement, startTime) + // Process the DataFrame, applying any necessary transformations + // and triggering Spark actions to materialize the results + // This is where the actual data processing occurs + queryResultWriter.processDataFrame(df, flintStatement, startTime) }(executionContext) // time out after 10 minutes ThreadUtils.awaitResult(futureQueryExecution, queryExecutionTimeOut) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala index c64712621..61c6e0747 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala @@ -26,14 +26,15 @@ class QueryResultWriterImpl(commandContext: CommandContext) writeDataFrameToOpensearch(dataFrame, resultIndex, osClient) } - /** - * Reformat the given DataFrame to the desired format. - */ - override def reformatDataFrame( + override def processDataFrame( dataFrame: DataFrame, statement: FlintStatement, queryStartTime: Long): DataFrame = { import commandContext._ + + /** + * Reformat the given DataFrame to the desired format for OpenSearch storage. + */ getFormattedData( applicationId, jobId,