Skip to content

Commit

Permalink
Deprecate visit child (1)
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <[email protected]>
  • Loading branch information
andy-k-improving committed Dec 17, 2024
1 parent d03f8bb commit 9db7927
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,17 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) {
final UnresolvedAttribute t1Attr = new UnresolvedAttribute(seq(TABLE_LHS, APPENDCOL_ID));
final UnresolvedAttribute t2Attr = new UnresolvedAttribute(seq(TABLE_RHS, APPENDCOL_ID));
final List<Expression> fieldsToRemove = new ArrayList<>(List.of(t1Attr, t2Attr));
final Node mainSearchNode = node.getChild().get(0);
// final Node mainSearchNode = visitFirstChild(node, context);
final Node subSearchNode = node.getSubSearch();

// Traverse to look for relation clause, then append it into the sub-search.
Relation relation = AppendColCatalystUtils.retrieveRelationClause(mainSearchNode);
AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), relation);

// Apply an additional projection layer on main-search to provide natural order.
// LogicalPlan mainSearch = mainSearchNode.accept(this, context);
LogicalPlan mainSearch = mainSearchNode.accept(this, context);
LogicalPlan mainSearch = visitFirstChild(node, context);
var mainSearchWithRowNumber = AppendColCatalystUtils.getRowNumStarProjection(context, mainSearch, TABLE_LHS);
context.withSubqueryAlias(mainSearchWithRowNumber);

// Duplicate the relation clause from main-search to sub-search.
AppendColCatalystUtils.appendRelationClause(node.getSubSearch(), context.getRelations());


context.apply(left -> {
// Apply an additional projection layer on sub-search to provide natural order.
LogicalPlan subSearch = subSearchNode.accept(this, context);
Expand All @@ -297,6 +294,7 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) {
Optional.of(new EqualTo(t1Attr, t2Attr)),
new Join.JoinHint());


// Remove the APPEND_ID and duplicated field on T1 if override option present.
if (node.override) {
List<Expression> getoverridedlist = AppendColCatalystUtils.getOverridedList(subSearchWithRowNumber, TABLE_LHS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand All @@ -21,6 +22,8 @@
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.unsafe.types.UTF8String;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ppl.CatalystPlanContext;
Expand All @@ -44,8 +47,18 @@ public interface AppendColCatalystUtils {
* @param subSearch User provided sub-search from APPENDCOL command.
* @param relation Relation clause which represent the dataSource that this sub-search execute upon.
*/
static void appendRelationClause(Node subSearch, Relation relation) {
final Relation table = new Relation(relation.getTableNames());
static void appendRelationClause(Node subSearch, List<LogicalPlan> relation) {

final List<UnresolvedExpression> unresolvedExpressionList = relation.stream()
.map(r -> {
UnresolvedRelation unresolvedRelation = (UnresolvedRelation) r;
List<String> multipartId = seqAsJavaList(unresolvedRelation.multipartIdentifier());
return (UnresolvedExpression) new QualifiedName(multipartId);
})
// To avoid stack overflow in the case of chained AppendCol.
.distinct()
.collect(Collectors.toList());
final Relation table = new Relation(unresolvedExpressionList);
while (subSearch != null) {
try {
subSearch = subSearch.getChild().get(0);
Expand All @@ -56,29 +69,6 @@ static void appendRelationClause(Node subSearch, Relation relation) {
}
}

/**
* Util method to traverse a given Node object and return the first occurrence of a Relation clause.
* @param node The Node object that this util method search upon.
* @return The first occurrence of Relation object from the given Node.
*/
static Relation retrieveRelationClause(Node node) {
while (node != null) {
if (node instanceof Relation) {
return (Relation) node;
} else {
try {
node = node.getChild().get(0);
} catch (NullPointerException ex) {
// Base on the current implementation of Flint,
// NPE will be thrown by certain type of Node implementation,
// when node.getChild() being called.
break;
}
}
}
return null;
}


/**
* Util method to perform analyzed() call against the given LogicalPlan to extract all fields
Expand Down

0 comments on commit 9db7927

Please sign in to comment.