Skip to content

Commit

Permalink
Merge branch 'main' into ppl-help-command
Browse files Browse the repository at this point in the history
# Conflicts:
#	ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java
#	ppl-spark-integration/src/main/scala/org/opensearch/flint/spark/ppl/PPLSyntaxParser.scala
  • Loading branch information
YANG-DB committed Oct 1, 2024
2 parents 498f6e7 + 3048368 commit c5e0ec7
Show file tree
Hide file tree
Showing 23 changed files with 1,175 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "auth-crt" % "2.25.23",
"software.amazon.awssdk" % "auth-crt" % "2.28.10" % "provided",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,39 @@ import org.opensearch.flint.common.model.FlintStatement
trait QueryResultWriter {

/**
* Writes the given DataFrame, which represents the result of a query execution, to an external
* data storage based on the provided FlintStatement metadata.
* Writes the given DataFrame to an external data storage based on the FlintStatement metadata.
* This method is responsible for persisting the query results.
*
* Note: This method typically involves I/O operations and may trigger Spark actions to
* materialize the DataFrame if it hasn't been processed yet.
*
* @param dataFrame
* The DataFrame containing the query results to be written.
* @param flintStatement
* The FlintStatement containing metadata that guides the writing process.
*/
def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit

/**
* Defines transformations on the given DataFrame and triggers an action to process it. This
* method applies necessary transformations based on the FlintStatement metadata and executes an
* action to compute the result.
*
* Note: Calling this method will trigger the actual data processing in Spark. If the Spark SQL
* thread is waiting for the result of a query, termination on the same thread will be blocked
* until the action completes.
*
* @param dataFrame
* The DataFrame to be processed.
* @param flintStatement
* The FlintStatement containing statement metadata.
* @param queryStartTime
* The start time of the query execution.
* @return
* The processed DataFrame after applying transformations and executing an action.
*/
def processDataFrame(
dataFrame: DataFrame,
flintStatement: FlintStatement,
queryStartTime: Long): DataFrame
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThanOrEqual, LessThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLEvalITSuite
Expand All @@ -22,13 +22,15 @@ class FlintSparkPPLEvalITSuite
/** Test table and index name */
private val testTable = "spark_catalog.default.flint_ppl_test"
private val testTableHttpLog = "spark_catalog.default.flint_ppl_test_http_log"
private val duplicatesNullableTestTable = "spark_catalog.default.duplicates_nullable_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createPartitionedStateCountryTable(testTable)
createTableHttpLog(testTableHttpLog)
createDuplicationNullableTable(duplicatesNullableTestTable)
}

protected override def afterEach(): Unit = {
Expand Down Expand Up @@ -632,8 +634,45 @@ class FlintSparkPPLEvalITSuite
EqualTo(Literal(true), and)
}

// Todo excluded fields not support yet
test("Test eval and signum function") {
val frame = sql(s"""
| source = $duplicatesNullableTestTable | fields id | sort id | eval i = pow(-2, id), s = signum(i) | head 5
| """.stripMargin)
val rows = frame.collect()
val expectedResults: Array[Row] = Array(
Row(1, -2d, -1d),
Row(2, 4d, 1d),
Row(3, -8d, -1d),
Row(4, 16d, 1d),
Row(5, -32d, -1d))
assert(rows.sameElements(expectedResults))

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val tablePlan =
UnresolvedRelation(Seq("spark_catalog", "default", "duplicates_nullable_test"))
val projectIdPlan = Project(Seq(UnresolvedAttribute("id")), tablePlan)
val sortPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("id"), Ascending)), global = true, projectIdPlan)
val evalPlan = Project(
Seq(
UnresolvedStar(None),
Alias(
UnresolvedFunction(
"pow",
Seq(Literal(-2), UnresolvedAttribute("id")),
isDistinct = false),
"i")(),
Alias(
UnresolvedFunction("signum", Seq(UnresolvedAttribute("i")), isDistinct = false),
"s")()),
sortPlan)
val localLimitPlan = LocalLimit(Literal(5), evalPlan)
val globalLimitPlan = GlobalLimit(Literal(5), localLimitPlan)
val expectedPlan = Project(Seq(UnresolvedStar(None)), globalLimitPlan)
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

// Todo excluded fields not support yet
ignore("test single eval expression with excluded fields") {
val frame = sql(s"""
| source = $testTable | eval new_field = "New Field" | fields - age
Expand Down
Loading

0 comments on commit c5e0ec7

Please sign in to comment.