Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix ambiguous exception when type mismatch in fillnull command #960

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ class FlintSparkPPLFillnullITSuite
assert(ex.getMessage().contains("Syntax error "))
}

test("test fillnull with null_replacement type mismatch") {
val frame = sql(s"""
| source = $testTable | fillnull with cast(0 as long) in status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 0),
Row(3, "/contact", "2023-10-01 10:10:00", 0),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

private def fillNullExpectedPlan(
nullReplacements: Seq[(String, Expression)],
addDefaultProject: Boolean = true): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.spark.sql.catalyst.plans.logical.Generate;
import org.apache.spark.sql.catalyst.plans.logical.Limit;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$;
import org.apache.spark.sql.catalyst.plans.logical.Project$;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
Expand Down Expand Up @@ -452,10 +453,30 @@ public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context)
Seq<NamedExpression> projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
// build the plan with the projection step
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p));
LogicalPlan resultWithoutDuplicatedColumns = context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan));
LogicalPlan resultWithoutDuplicatedColumns = context.apply(dropOriginalColumns(p -> p.children().head(), toDrop));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test this patch with the latest Spark version locally to double confirm as a long term fixing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified with Spark branch-3.5, which includes this fix: apache/spark#48240.

As expected, it will throw ambiguous exception for all cases and could be addressed with this PR.

return Objects.requireNonNull(resultWithoutDuplicatedColumns, "FillNull operation failed");
}

/**
* This method is used to generate DataFrameDropColumns operator for dropping duplicated columns
* in the original plan. Then achieving similar effect like updating columns.
*
* PLAN_ID_TAG is a mechanism inner Spark that explicitly specify a plan to resolve the
* UnresolvedAttributes. Set toDrop expressions' PLAN_ID_TAG to the same value as that of the
* original plan, so Spark will resolve them correctly by that plan instead of the child.
*/
private java.util.function.Function<LogicalPlan, LogicalPlan> dropOriginalColumns(
java.util.function.Function<LogicalPlan, LogicalPlan> findOriginalPlan,
List<Expression> toDrop) {
return logicalPlan -> {
LogicalPlan originalPlan = findOriginalPlan.apply(logicalPlan);
long planId = logicalPlan.hashCode();
originalPlan.setTagValue(LogicalPlan$.MODULE$.PLAN_ID_TAG(), planId);
toDrop.forEach(e -> e.setTagValue(LogicalPlan$.MODULE$.PLAN_ID_TAG(), planId));
return DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan);
};
}

@Override
public LogicalPlan visitFlatten(Flatten flatten, CatalystPlanContext context) {
visitFirstChild(flatten, context);
Expand Down
Loading