From 9fe754cf47d831c657448445d3ca7def2fb98422 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 19 Nov 2024 17:35:28 -0800 Subject: [PATCH 1/6] Support alter from manual to auto without specifying scheduler mode (#930) Signed-off-by: Louis Chu --- .../opensearch/flint/spark/FlintSpark.scala | 4 + .../spark/FlintSparkUpdateIndexITSuite.scala | 83 +++++++++++++++++++ 2 files changed, 87 insertions(+) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 68d2409ee..fbc24e93a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -510,6 +510,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w private def isSchedulerModeChanged( originalOptions: FlintSparkIndexOptions, updatedOptions: FlintSparkIndexOptions): Boolean = { + // Altering from manual to auto should not be interpreted as a scheduling mode change. + if (!originalOptions.options.contains(SCHEDULER_MODE.toString)) { + return false + } updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled() } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index c9f6c47f7..f27c0dae9 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -618,6 +618,44 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { flint.queryIndex(testIndex).collect().toSet should have size 2 } + test("update full refresh index to auto refresh should start job with external scheduler") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create full refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "false")), testIndex) + .create() + + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 0 + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.isExternalSchedulerEnabled() shouldBe false + + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe empty + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.isExternalSchedulerEnabled() shouldBe true + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some( + FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL) + + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + test("update incremental refresh index to auto refresh should start job") { withTempDir { checkpointDir => // Create incremental refresh Flint index and wait for complete @@ -667,6 +705,51 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { } } + test( + "update incremental refresh index to auto refresh should start job with external scheduler") { + setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true") + + withTempDir { checkpointDir => + // Create incremental refresh Flint index + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "incremental_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + + spark.streams.active.find(_.name == testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 0 + val indexInitial = flint.describeIndex(testIndex).get + indexInitial.options.isExternalSchedulerEnabled() shouldBe false + + val updatedIndex = flint + .skippingIndex() + .copyWithUpdate( + indexInitial, + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "incremental_refresh" -> "false", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe empty + val indexFinal = flint.describeIndex(testIndex).get + indexFinal.options.isExternalSchedulerEnabled() shouldBe true + indexFinal.options.autoRefresh() shouldBe true + indexFinal.options.refreshInterval() shouldBe Some( + FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL) + + verifySchedulerIndex(testIndex, 5, "MINUTES") + } + } + test("update auto refresh index to full refresh should stop job") { // Create auto refresh Flint index and wait for complete flint From dc690b4d420312ce666b9d2e8f9bbf38aabc7365 Mon Sep 17 00:00:00 2001 From: qianheng Date: Wed, 20 Nov 2024 13:43:19 +0800 Subject: [PATCH 2/6] Support flatten with alias (#927) * Support flatten with alias Signed-off-by: Heng Qian * Address comments Signed-off-by: Heng Qian * Address comments Signed-off-by: Heng Qian * Add flatten with alias example Signed-off-by: Heng Qian * Add IT for ambiguous exception Signed-off-by: Heng Qian --------- Signed-off-by: Heng Qian --- docs/ppl-lang/PPL-Example-Commands.md | 1 + docs/ppl-lang/ppl-flatten-command.md | 19 ++++- .../ppl/FlintSparkPPLFlattenITSuite.scala | 83 ++++++++++++++++++- .../src/main/antlr4/OpenSearchPPLParser.g4 | 7 +- .../org/opensearch/sql/ast/tree/Flatten.java | 3 + .../sql/ppl/CatalystQueryPlanVisitor.java | 8 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 4 +- .../sql/ppl/parser/AstExpressionBuilder.java | 5 ++ ...lanFlattenCommandTranslatorTestSuite.scala | 45 +++++++++- 9 files changed, 165 insertions(+), 10 deletions(-) diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 7766c3b50..26ddd3613 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table` Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct` or `array>` - `source = table | flatten bridges` - `source = table | flatten coor` +- `source = table | flatten coor as (altitude, latitude, longitude)` - `source = table | flatten bridges | flatten coor` - `source = table | fields bridges | flatten bridges` - `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country` diff --git a/docs/ppl-lang/ppl-flatten-command.md b/docs/ppl-lang/ppl-flatten-command.md index 4c1ae5d0d..68b03e82e 100644 --- a/docs/ppl-lang/ppl-flatten-command.md +++ b/docs/ppl-lang/ppl-flatten-command.md @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type: ### Syntax -`flatten ` +`flatten [As aliasSequence]` * field: to be flattened. The field must be of supported type. +* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field. ### Test table #### Schema @@ -87,4 +88,18 @@ PPL query: | 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378| | 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402| | 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402| -| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | \ No newline at end of file +| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL | + +### Example 4: flatten with aliasSequence +This example shows how to flatten with aliasSequence. +PPL query: + - `source=table | flatten coor as (altitude, latitude, longitude)` + +| \_time | bridges | city | country | altitude | latitude | longtitude | +|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------| +| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 | +| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 | +| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 | +| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 | +| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 | +| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL | diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala index e714a5f7e..7d1b6e437 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLFlattenITSuite.scala @@ -9,7 +9,7 @@ import java.nio.file.Files import org.opensearch.flint.spark.FlattenGenerator import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{Alias, EqualTo, GeneratorOuter, Literal, Or} import org.apache.spark.sql.catalyst.plans.logical._ @@ -347,4 +347,85 @@ class FlintSparkPPLFlattenITSuite val expectedPlan = Project(Seq(UnresolvedStar(None)), flattenMultiValue) comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) } + + test("flatten struct nested table using alias") { + val frame = sql(s""" + | source = $structNestedTable + | | flatten struct_col + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + assert( + frame.columns.sameElements( + Array("int_col", "field2", "subfield_1", "field2_2", "subfield_2"))) + val results: Array[Row] = frame.collect() + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) + val expectedResults: Array[Row] = + Array( + Row(30, 123, "value1", 23, "valueA"), + Row(40, 123, "value5", 33, "valueB"), + Row(30, 823, "value4", 83, "valueC"), + Row(40, 456, "value2", 46, "valueD"), + Row(50, 789, "value3", 89, "valueE")).sorted + // Compare the results + assert(results.sorted.sameElements(expectedResults)) + + // duplicate alias names + val frame2 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as subfield_1 + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as subfield_2 + | """.stripMargin) + + // alias names duplicate with existing fields + assert( + frame2.columns.sameElements( + Array("int_col", "field2_2", "subfield_1", "field2_2", "subfield_2"))) + assert(frame2.collect().sorted.sameElements(expectedResults)) + + val frame3 = sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + + assert( + frame3.columns.sameElements(Array("int_col", "field2_2", "int_col", "field2_2", "int_col"))) + assert(frame3.collect().sorted.sameElements(expectedResults)) + + // Throw AnalysisException if The number of aliases supplied in the AS clause does not match the + // number of columns output + val except = intercept[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | """.stripMargin) + } + assert(except.message.contains( + "The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF")) + + // Throw AnalysisException because of ambiguous + val except2 = intercept[AnalysisException] { + sql(s""" + | source = $structNestedTable + | | flatten struct_col as (field1, field2_2) + | | flatten field1 as int_col + | | flatten struct_col2 as (field1, field2_2) + | | flatten field1 as int_col + | | fields field2_2 + | """.stripMargin) + } + assert(except2.message.contains( + "[AMBIGUOUS_REFERENCE] Reference `field2_2` is ambiguous, could be: [`field2_2`, `field2_2`].")) + } + } diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 index f7e1b3da4..aebc96869 100644 --- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 +++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 @@ -259,7 +259,7 @@ expandCommand ; flattenCommand - : FLATTEN fieldExpression + : FLATTEN fieldExpression (AS alias = identifierSeq)? ; trendlineCommand @@ -1043,6 +1043,11 @@ qualifiedName : ident (DOT ident)* # identsAsQualifiedName ; +identifierSeq + : qualifiedName (COMMA qualifiedName)* # identsAsQualifiedNameSeq + | LT_PRTHS qualifiedName (COMMA qualifiedName)* RT_PRTHS # identsAsQualifiedNameSeq + ; + tableQualifiedName : tableIdent (DOT ident)* # identsAsTableQualifiedName ; diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java index 9c57d2adf..36c126591 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ast/tree/Flatten.java @@ -7,6 +7,7 @@ import org.opensearch.sql.ast.expression.Field; import java.util.List; +import org.opensearch.sql.ast.expression.UnresolvedExpression; @RequiredArgsConstructor public class Flatten extends UnresolvedPlan { @@ -15,6 +16,8 @@ public class Flatten extends UnresolvedPlan { @Getter private final Field field; + @Getter + private final List aliasSequence; @Override public UnresolvedPlan attach(UnresolvedPlan child) { diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 000c16b92..b78471591 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -84,12 +84,10 @@ import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.BiConsumer; import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEvents; import static org.opensearch.sql.ppl.utils.DedupeTransformer.retainMultipleDuplicateEventsAndKeepEmpty; @@ -466,9 +464,13 @@ public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) { context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); } Expression field = visitExpression(flatten.getField(), context); + List alias = flatten.getAliasSequence().stream() + .map(aliasNode -> visitExpression(aliasNode, context)) + .collect(Collectors.toList()); context.retainAllNamedParseExpressions(p -> (NamedExpression) p); FlattenGenerator flattenGenerator = new FlattenGenerator(field); - context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, seq(), p)); + scala.collection.mutable.Seq outputs = alias.isEmpty() ? seq() : seq(alias); + context.apply(p -> new Generate(new GeneratorOuter(flattenGenerator), seq(), true, (Option) None$.MODULE$, outputs, p)); return context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(field), logicalPlan)); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7d1cc072b..b1254bf8f 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -18,6 +18,7 @@ import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; +import org.opensearch.sql.ast.expression.AttributeList; import org.opensearch.sql.ast.expression.DataType; import org.opensearch.sql.ast.expression.EqualTo; import org.opensearch.sql.ast.expression.Field; @@ -605,7 +606,8 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo @Override public UnresolvedPlan visitFlattenCommand(OpenSearchPPLParser.FlattenCommandContext ctx) { Field unresolvedExpression = (Field) internalVisitExpression(ctx.fieldExpression()); - return new Flatten(unresolvedExpression); + List alias = ctx.alias == null ? emptyList() : ((AttributeList) internalVisitExpression(ctx.alias)).getAttrList(); + return new Flatten(unresolvedExpression, alias); } /** AD command. */ diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index e683a1395..1fef4251e 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -329,6 +329,11 @@ public UnresolvedExpression visitIdentsAsQualifiedName(OpenSearchPPLParser.Ident return visitIdentifiers(ctx.ident()); } + @Override + public UnresolvedExpression visitIdentsAsQualifiedNameSeq(OpenSearchPPLParser.IdentsAsQualifiedNameSeqContext ctx) { + return new AttributeList(ctx.qualifiedName().stream().map(this::visit).collect(Collectors.toList())); + } + @Override public UnresolvedExpression visitIdentsAsTableQualifiedName( OpenSearchPPLParser.IdentsAsTableQualifiedNameContext ctx) { diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala index 58a6c04b3..543e5c05d 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanFlattenCommandTranslatorTestSuite.scala @@ -13,9 +13,9 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, Descending, GeneratorOuter, Literal, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, GeneratorOuter, Literal, RegExpExtract} import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, GlobalLimit, LocalLimit, Project, Sort} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, Generate, Project} import org.apache.spark.sql.types.IntegerType class PPLLogicalPlanFlattenCommandTranslatorTestSuite @@ -153,4 +153,45 @@ class PPLLogicalPlanFlattenCommandTranslatorTestSuite comparePlans(expectedPlan, logPlan, checkAnalysis = false) } + test("test flatten with one alias") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as col1"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = + Generate(outerGenerator, seq(), true, None, Seq(UnresolvedAttribute("col1")), relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test flatten with alias list") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=relation | flatten field_with_array as (col1, col2)"), + context) + + val relation = UnresolvedRelation(Seq("relation")) + val flattenGenerator = new FlattenGenerator(UnresolvedAttribute("field_with_array")) + val outerGenerator = GeneratorOuter(flattenGenerator) + val generate = Generate( + outerGenerator, + seq(), + true, + None, + Seq(UnresolvedAttribute("col1"), UnresolvedAttribute("col2")), + relation) + val dropSourceColumn = + DataFrameDropColumns(Seq(UnresolvedAttribute("field_with_array")), generate) + val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn) + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + } From 8efe1ec41b19939c72b4f8ef315ea22903778d50 Mon Sep 17 00:00:00 2001 From: Chase <62891993+engechas@users.noreply.github.com> Date: Wed, 20 Nov 2024 10:30:25 -0800 Subject: [PATCH 3/6] Add support for serializing MapType (#929) Signed-off-by: Chase Engelbrecht --- .../spark/sql/flint/datatype/FlintDataType.scala | 3 +++ .../sql/flint/datatype/FlintDataTypeSuite.scala | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala index a4b23bd46..5d920a07e 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/datatype/FlintDataType.scala @@ -153,6 +153,9 @@ object FlintDataType { // objects case st: StructType => serializeJValue(st) + // Serialize maps as empty objects and let the map entries automap + case mt: MapType => serializeJValue(new StructType()) + // array case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty) diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala index 94f4839d6..44e8158d8 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/datatype/FlintDataTypeSuite.scala @@ -128,6 +128,21 @@ class FlintDataTypeSuite extends FlintSuite with Matchers { |}""".stripMargin) } + test("spark map type serialize") { + val sparkStructType = StructType( + StructField("mapField", MapType(StringType, StringType), true) :: + Nil) + + FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{ + | "properties": { + | "mapField": { + | "properties": { + | } + | } + | } + |}""".stripMargin) + } + test("spark varchar and char type serialize") { val flintDataType = """{ | "properties": { From a14013c2f93e7faf80a04bd2c1e21d440d57420d Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 20 Nov 2024 19:36:05 -0800 Subject: [PATCH 4/6] Record statement execution error (#940) --- .../apache/spark/sql/FlintJobITSuite.scala | 55 ++++++++++++------- .../apache/spark/sql/FlintJobExecutor.scala | 3 +- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala index 11bc7271c..81bf60f5e 100644 --- a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -81,36 +81,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } } + def createJobOperator(query: String, jobRunId: String): JobOperator = { + val streamingRunningCount = new AtomicInteger(0) + + /* + * Because we cannot test from FlintJob.main() for the reason below, we have to configure + * all Spark conf required by Flint code underlying manually. + */ + spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) + spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING) + + val job = JobOperator( + appId, + jobRunId, + spark, + query, + queryId, + dataSourceName, + resultIndex, + FlintJobType.STREAMING, + streamingRunningCount) + job.terminateJVM = false + job + } + def startJob(query: String, jobRunId: String): Future[Unit] = { val prefix = "flint-job-test" val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - val streamingRunningCount = new AtomicInteger(0) val futureResult = Future { - /* - * Because we cannot test from FlintJob.main() for the reason below, we have to configure - * all Spark conf required by Flint code underlying manually. - */ - spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) - spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING) /** * FlintJob.main() is not called because we need to manually set these variables within a * JobOperator instance to accommodate specific runtime requirements. */ - val job = - JobOperator( - appId, - jobRunId, - spark, - query, - queryId, - dataSourceName, - resultIndex, - FlintJobType.STREAMING, - streamingRunningCount) - job.terminateJVM = false + val job = createJobOperator(query, jobRunId) job.start() } futureResult.onComplete { @@ -291,6 +297,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } test("create skipping index with non-existent table") { + val prefix = "flint-job-test" + val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1) + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val query = s""" | CREATE SKIPPING INDEX ON testTable @@ -303,7 +313,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { | """.stripMargin val queryStartTime = System.currentTimeMillis() val jobRunId = "00ff4o3b5091080r" - threadLocalFuture.set(startJob(query, jobRunId)) + + val job = createJobOperator(query, jobRunId) + threadLocalFuture.set(Future(job.start())) val validation: REPLResult => Boolean = result => { assert( @@ -315,6 +327,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { assert(result.status == "FAILED", s"expected status is FAILED, but got ${result.status}") assert(!result.error.isEmpty, s"we expect error, but got ${result.error}") + assert( + job.throwableHandler.error.contains("Table spark_catalog.default.testTable is not found"), + "Expected error message to mention 'spark_catalog.default.testTable is not found'") commonAssert(result, jobRunId, query, queryStartTime) true } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 63c120a2c..a9bb6f5bb 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -450,7 +450,8 @@ trait FlintJobExecutor { statusCode.foreach(code => errorDetails.put("StatusCode", code.toString)) val errorJson = mapper.writeValueAsString(errorDetails) - + // Record the processed error message + throwableHandler.setError(errorJson) // CustomLogging will call log4j logger.error() underneath statusCode match { case Some(code) => From 31fae14cf1c4ff4e737e72411595f9bf634eff12 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 21 Nov 2024 18:19:45 -0800 Subject: [PATCH 5/6] Rethrow exception in writeData (#943) Signed-off-by: Louis Chu --- .../org/opensearch/flint/core/IRestHighLevelClient.java | 8 ++++---- .../scala/org/apache/spark/sql/FlintJobExecutor.scala | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 9facd89ef..721685c38 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -98,15 +98,15 @@ static void recordLatency(String metricNamePrefix, long latencyMilliseconds) { * Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx). * * @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure - * @param e the exception encountered during the operation, used to determine the type of failure + * @param t the exception encountered during the operation, used to determine the type of failure */ - static void recordOperationFailure(String metricNamePrefix, Exception e) { - OpenSearchException openSearchException = extractOpenSearchException(e); + static void recordOperationFailure(String metricNamePrefix, Throwable t) { + OpenSearchException openSearchException = extractOpenSearchException(t); int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; if (openSearchException != null) { CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException); } else { - CustomLogging.logError("OpenSearch Operation failed with an exception.", e); + CustomLogging.logError("OpenSearch Operation failed with an exception.", t); } if (statusCode == 403) { String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index a9bb6f5bb..ad26cf21a 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -168,10 +168,12 @@ trait FlintJobExecutor { IRestHighLevelClient.recordOperationSuccess( MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) } catch { - case e: Exception => + case t: Throwable => IRestHighLevelClient.recordOperationFailure( MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, - e) + t) + // Re-throw the exception + throw t } } From 3ff2ef23739bc900be3ea4fdac5b9b2615f40265 Mon Sep 17 00:00:00 2001 From: qianheng Date: Fri, 22 Nov 2024 11:41:26 +0800 Subject: [PATCH 6/6] Enable parallelExecution for integration test suites (#934) * Split integration test to multiple groups and enable parallelExecution Signed-off-by: Heng Qian * Fix spark-warehouse conflict Signed-off-by: Heng Qian * Test with 3 groups Signed-off-by: Heng Qian * Random shuffle tests before splitting groups Signed-off-by: Heng Qian * reset group number to 4 Signed-off-by: Heng Qian * revert shuffle Signed-off-by: Heng Qian --------- Signed-off-by: Heng Qian --- build.sbt | 28 ++++++++++++++++--- .../scala/org/apache/spark/FlintSuite.scala | 2 ++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 724d348ae..365b88aa3 100644 --- a/build.sbt +++ b/build.sbt @@ -2,8 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -import Dependencies._ -import sbtassembly.AssemblyPlugin.autoImport.ShadeRule +import Dependencies.* lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.5.1" @@ -38,6 +37,11 @@ ThisBuild / scalastyleConfig := baseDirectory.value / "scalastyle-config.xml" */ ThisBuild / Test / parallelExecution := false +/** + * Set the parallelism of forked tests to 4 to accelerate integration test + */ +concurrentRestrictions in Global := Seq(Tags.limit(Tags.ForkedTestGroup, 4)) + // Run as part of compile task. lazy val compileScalastyle = taskKey[Unit]("compileScalastyle") @@ -274,13 +278,29 @@ lazy val integtest = (project in file("integ-test")) IntegrationTest / javaSource := baseDirectory.value / "src/integration/java", IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", IntegrationTest / resourceDirectory := baseDirectory.value / "src/integration/resources", - IntegrationTest / parallelExecution := false, + IntegrationTest / parallelExecution := true, // enable parallel execution + IntegrationTest / testForkedParallel := false, // disable forked parallel execution to avoid duplicate spark context in the same JVM IntegrationTest / fork := true, + IntegrationTest / testGrouping := { + val tests = (IntegrationTest / definedTests).value + val forkOptions = ForkOptions() + val groups = tests.grouped(tests.size / 4 + 1).zipWithIndex.map { case (group, index) => + val groupName = s"group-${index + 1}" + new Tests.Group( + name = groupName, + tests = group, + runPolicy = Tests.SubProcess( + forkOptions.withRunJVMOptions(forkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/integ-test/target/tmp/$groupName"))) + ) + } + groups.toSeq + } )), inConfig(AwsIntegrationTest)(Defaults.testSettings ++ Seq( AwsIntegrationTest / javaSource := baseDirectory.value / "src/aws-integration/java", AwsIntegrationTest / scalaSource := baseDirectory.value / "src/aws-integration/scala", - AwsIntegrationTest / parallelExecution := false, + AwsIntegrationTest / parallelExecution := true, AwsIntegrationTest / fork := true, )), libraryDependencies ++= Seq( diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index b675265b7..1d301087f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -12,6 +12,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf} import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.test.SharedSparkSession trait FlintSuite extends SharedSparkSession { @@ -30,6 +31,7 @@ trait FlintSuite extends SharedSparkSession { .set( FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key, "org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest") + .set(WAREHOUSE_PATH.key, s"spark-warehouse/${suiteName}") conf }