Skip to content

Commit

Permalink
Code tidy
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <[email protected]>
  • Loading branch information
andy-k-improving committed Dec 16, 2024
1 parent 9194ca6 commit 7e17ab9
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ public enum BuiltinFunctionName {
ISNULL(FunctionName.of("isnull")),
ISPRESENT(FunctionName.of("ispresent")),

ROW_NUMBER(FunctionName.of("row_number")),
RANK(FunctionName.of("rank")),
DENSE_RANK(FunctionName.of("dense_rank")),

Expand Down Expand Up @@ -333,7 +332,6 @@ public FunctionName getName() {
.put("percentile", BuiltinFunctionName.PERCENTILE)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.put("approx_count_distinct", BuiltinFunctionName.APPROX_COUNT_DISTINCT)
.put("row_number", BuiltinFunctionName.ROW_NUMBER)
.build();

public static Optional<BuiltinFunctionName> of(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,31 +282,32 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) {
final Node mainSearchNode = node.getChild().get(0);
final Node subSearchNode = node.getSubSearch();

// Traverse to look for relation clause then append it into the sub-search.
// Traverse to look for relation clause, then append it into the sub-search.
Relation relation = AppendColCatalystUtils.retrieveRelationClause(mainSearchNode);
AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), relation);

// Add apply a dropColumns if override present, then add * with ROW_NUMBER
LogicalPlan leftTemp = mainSearchNode.accept(this, context);
var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, leftTemp, TABLE_LHS);
// Apply an additional projection layer on main-search to provide natural order.
LogicalPlan mainSearch = mainSearchNode.accept(this, context);
var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, mainSearch, TABLE_LHS);
context.withSubqueryAlias(mainSearchWithRowNumber);

context.apply(left -> {
// Add a new projection layer with * and ROW_NUMBER (Sub-search)
// Apply an additional projection layer on sub-search to provide natural order.
LogicalPlan subSearch = subSearchNode.accept(this, context);
var subSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, subSearch, TABLE_RHS);
context.withSubqueryAlias(subSearchWithRowNumber);

context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);
// Composite the join clause

// Join both Main and Sub search with _ROW_NUMBER_ column
LogicalPlan joinedQuery = join(
mainSearchWithRowNumber, subSearchWithRowNumber,
Join.JoinType.LEFT,
Optional.of(new EqualTo(t1Attr, t2Attr)),
new Join.JoinHint());

// Remove the APPEND_ID and duplicated field on T1 if override option is true.
// Remove the APPEND_ID and duplicated field on T1 if override option present.
if (node.override) {
List<Expression> getoverridedlist = AppendColCatalystUtils.getoverridedlist(subSearchWithRowNumber, TABLE_LHS);
fieldsToRemove.addAll(getoverridedlist);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.plans.logical.Project;
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias;
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$;
import org.apache.spark.sql.execution.CommandExecutionMode;
Expand All @@ -33,14 +35,13 @@
public interface AppendColCatalystUtils {

/**
* Response to traverse given subSearch Node till the last child, then append the Relation clause,
* in order to specify the data source || index.
* Responsible to traverse given subSearch Node till the last child, then append the Relation clause,
* in order to specify the data source || index for the subSearch.
* @param subSearch User provided sub-search from APPENDCOL command.
* @param relation Relation clause which represent the dataSource that this sub-search execute upon.
*/
static void appendRelationClause(Node subSearch, Relation relation) {
Relation table = new Relation(relation.getTableNames());
// Replace it with a function to look up the search command and extract the index name.
final Relation table = new Relation(relation.getTableNames());
while (subSearch != null) {
try {
subSearch = subSearch.getChild().get(0);
Expand Down Expand Up @@ -76,7 +77,7 @@ static Relation retrieveRelationClause(Node node) {


/**
* Util method to perform analyzed() call against the given LogicalPlan to exact all fields
* Util method to perform analyzed() call against the given LogicalPlan to extract all fields
* that will be projected upon the execution in the form of Java List with user provided schema prefix.
* @param lp LogicalPlan instance to extract the projection fields from.
* @param tableName the table || schema name being appended as part of the returned fields.
Expand All @@ -97,23 +98,23 @@ static List<Expression> getoverridedlist(LogicalPlan lp, String tableName) {
}

/**
* Helper method to first add an additional project clause to provide row_number, then wrap it SubqueryAlias and return.
* Helper method to first add an additional projection clause to provide row_number, then wrap it SubqueryAlias and return.
* @param context Context object of the current Parser.
* @param lp The Logical Plan instance which contains the query.
* @param alias The name of the Alias clause.
* @return A subqeuryAlias instance which has row_number for natural ordering purpose.
* @return A subqueryAlias instance which has row_number for natural ordering purpose.
*/
static SubqueryAlias getRowNumStarProjection(CatalystPlanContext context, LogicalPlan lp, String alias) {
final SortOrder sortOrder = SortUtils.sortOrder(
new org.apache.spark.sql.catalyst.expressions.Literal(
new Literal(
UTF8String.fromString("1"), DataTypes.StringType), false);

final NamedExpression appendCol = WindowSpecTransformer.buildRowNumber(seq(), seq(sortOrder));
final List<NamedExpression> projectList = (context.getNamedParseExpressions().isEmpty())
? List.of(appendCol, new UnresolvedStar(Option.empty()))
: List.of(appendCol);

final LogicalPlan lpWithProjection = new org.apache.spark.sql.catalyst.plans.logical.Project(seq(
final LogicalPlan lpWithProjection = new Project(seq(
projectList), lp);
return SubqueryAlias$.MODULE$.apply(alias, lpWithProjection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static NamedExpression getWMAComputationExpression(CatalystExpressionVis
* @param expression The expression which will be evaluated.
* @return An Alias instance with logical plan representation of `expression AS name`.
*/
public static NamedExpression getAlias(String name, Expression expression) {
private static NamedExpression getAlias(String name, Expression expression) {
return org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(expression,
name,
NamedExpression.newExprId(),
Expand Down

0 comments on commit 7e17ab9

Please sign in to comment.