Skip to content

Commit

Permalink
[BugFix] Fix ambiguous exception when type mismatch in fillnull comma…
Browse files Browse the repository at this point in the history
…nd (opensearch-project#960)

Signed-off-by: Heng Qian <[email protected]>
  • Loading branch information
qianheng-aws authored Dec 3, 2024
1 parent d35d66b commit 63222a7
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ class FlintSparkPPLFillnullITSuite
assert(ex.getMessage().contains("Syntax error "))
}

test("test fillnull with null_replacement type mismatch") {
val frame = sql(s"""
| source = $testTable | fillnull with cast(0 as long) in status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 0),
Row(3, "/contact", "2023-10-01 10:10:00", 0),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

private def fillNullExpectedPlan(
nullReplacements: Seq[(String, Expression)],
addDefaultProject: Boolean = true): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.catalyst.plans.logical.Generate;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$;
import org.apache.spark.sql.catalyst.plans.logical.Project$;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
Expand Down Expand Up @@ -452,10 +453,30 @@ public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context)
Seq<NamedExpression> projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
// build the plan with the projection step
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p));
LogicalPlan resultWithoutDuplicatedColumns = context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan));
LogicalPlan resultWithoutDuplicatedColumns = context.apply(dropOriginalColumns(p -> p.children().head(), toDrop));
return Objects.requireNonNull(resultWithoutDuplicatedColumns, "FillNull operation failed");
}

/**
* This method is used to generate DataFrameDropColumns operator for dropping duplicated columns
* in the original plan. Then achieving similar effect like updating columns.
*
* PLAN_ID_TAG is a mechanism inner Spark that explicitly specify a plan to resolve the
* UnresolvedAttributes. Set toDrop expressions' PLAN_ID_TAG to the same value as that of the
* original plan, so Spark will resolve them correctly by that plan instead of the child.
*/
private java.util.function.Function<LogicalPlan, LogicalPlan> dropOriginalColumns(
java.util.function.Function<LogicalPlan, LogicalPlan> findOriginalPlan,
List<Expression> toDrop) {
return logicalPlan -> {
LogicalPlan originalPlan = findOriginalPlan.apply(logicalPlan);
long planId = logicalPlan.hashCode();
originalPlan.setTagValue(LogicalPlan$.MODULE$.PLAN_ID_TAG(), planId);
toDrop.forEach(e -> e.setTagValue(LogicalPlan$.MODULE$.PLAN_ID_TAG(), planId));
return DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan);
};
}

@Override
public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
visitFirstChild(flatten, context);
Expand Down

0 comments on commit 63222a7

Please sign in to comment.