From 365cc12cb92fde1ace5fcb4c2eb624b4b39fca1e Mon Sep 17 00:00:00 2001 From: Andy Kwok Date: Wed, 11 Dec 2024 15:59:57 -0800 Subject: [PATCH] Update logic Signed-off-by: Andy Kwok --- .../sql/ppl/CatalystQueryPlanVisitor.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index bd60f8224..1652961e9 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -278,11 +278,11 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { var mainSearch = getRowNumStarProjection(context, leftTemp, TABLE_LHS); context.withSubqueryAlias(mainSearch); - // Inject an addition search command into sub-search (T2) - appendRelationClause(node.getSubSearch(), "employees"); + // Traverse to look for relation clause then append it into the sub-search. + Relation relation = retrieveRelationClause(node.getChild().get(0)); + appendRelationClause(node.getSubSearch(), relation); context.apply(left -> { - // Add a new projection layer with * and ROW_NUMBER (Sub-search) LogicalPlan right = node.getSubSearch().accept(this, context); var subSearch = getRowNumStarProjection(context, right, TABLE_RHS); @@ -298,23 +298,16 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) { // Remove the APPEND_ID return new org.apache.spark.sql.catalyst.plans.logical.DataFrameDropColumns(fieldsToRemove, joinedQuery); }); - -// System.out.println(context); return context.getPlan(); } - private static void appendRelationClause(Node subSearch, String relationName) { + private static void appendRelationClause(Node subSearch, Relation relation) { - // Till traverse till the end then append. - Relation table = new Relation(of(new QualifiedName(relationName))); + Relation table = new Relation(relation.getTableNames()); // Replace it with a function to look up the search command and extract the index name. - - while (subSearch != null) { try { - System.out.println("Node: " + subSearch.getClass().getSimpleName()); subSearch = subSearch.getChild().get(0); -// subSearch = node1; } catch (NullPointerException ex) { System.out.println("Null when getting the child "); ((UnresolvedPlan) subSearch).attach(table); @@ -323,6 +316,22 @@ private static void appendRelationClause(Node subSearch, String relationName) { } } + private static Relation retrieveRelationClause(Node node) { + while (node != null) { + if (node instanceof Relation) { + return (Relation) node; + } else { + try { + node = node.getChild().get(0); + } catch (NullPointerException ex) { + // NPE will be thrown by some node.getChild() call. + break; + } + } + } + return null; + } + private org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias getRowNumStarProjection(CatalystPlanContext context, LogicalPlan lp, String alias) { final String DUMMY_SORT_FIELD = "1";