Skip to content

Commit

Permalink
Add more scala doc and update sbt
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 29, 2024
1 parent dda7427 commit 19ac42b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "auth-crt" % "2.25.23",
"software.amazon.awssdk" % "auth-crt" % "2.28.10" % "provided",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,38 @@ import org.opensearch.flint.common.model.FlintStatement
trait QueryResultWriter {

/**
* Writes the given DataFrame, which represents the result of a query execution, to an external
* data storage based on the provided FlintStatement metadata.
* Writes the given DataFrame to an external data storage based on the FlintStatement metadata.
* This method is responsible for persisting the query results.
*
* Note: This method typically involves I/O operations and may trigger Spark actions to
* materialize the DataFrame if it hasn't been processed yet.
*
* @param dataFrame
* The DataFrame containing the query results to be written.
* @param flintStatement
* The FlintStatement containing metadata that guides the writing process.
*/
def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit

/**
* Reformat the given DataFrame to the desired format.
* Defines transformations on the given DataFrame and triggers an action to process it. This
* method applies necessary transformations based on the FlintStatement metadata and executes an
* action to compute the result.
*
* Note: Calling this method will trigger the actual data processing in Spark. If the Spark SQL
* thread is waiting for the result of a query, termination on the same thread will be blocked
* until the action completes.
*
* @param dataFrame
* The DataFrame to be processed.
* @param flintStatement
* The FlintStatement containing statement metadata.
* @param queryStartTime
* The start time of the query execution.
* @return
* The processed DataFrame after applying transformations and executing an action.
*/
def reformatDataFrame(
def processDataFrame(
dataFrame: DataFrame,
flintStatement: FlintStatement,
queryStartTime: Long): DataFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
} else {
val futureQueryExecution = Future {
val startTime = System.currentTimeMillis()
// Execute the statement and get the resulting DataFrame
// This step may involve Spark transformations, but not necessarily actions
val df = statementsExecutionManager.executeStatement(flintStatement)
queryResultWriter.reformatDataFrame(df, flintStatement, startTime)
// Process the DataFrame, applying any necessary transformations
// and triggering Spark actions to materialize the results
// This is where the actual data processing occurs
queryResultWriter.processDataFrame(df, flintStatement, startTime)
}(executionContext)
// time out after 10 minutes
ThreadUtils.awaitResult(futureQueryExecution, queryExecutionTimeOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ class QueryResultWriterImpl(commandContext: CommandContext)
writeDataFrameToOpensearch(dataFrame, resultIndex, osClient)
}

/**
* Reformat the given DataFrame to the desired format.
*/
override def reformatDataFrame(
override def processDataFrame(
dataFrame: DataFrame,
statement: FlintStatement,
queryStartTime: Long): DataFrame = {
import commandContext._

/**
* Reformat the given DataFrame to the desired format for OpenSearch storage.
*/
getFormattedData(
applicationId,
jobId,
Expand Down

0 comments on commit 19ac42b

Please sign in to comment.