Skip to content

Commit

Permalink
Override option
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <[email protected]>
  • Loading branch information
andy-k-improving committed Dec 13, 2024
1 parent 822ebd5 commit 25857f2
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 292 deletions.
3 changes: 3 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ NULLS: 'NULLS';
SMA: 'SMA';
WMA: 'WMA';

// APPENDCOL options
OVERRIDE: 'OVERRIDE';

// ARGUMENT KEYWORDS
KEEPEMPTY: 'KEEPEMPTY';
CONSECUTIVE: 'CONSECUTIVE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ trendlineType
;

appendcolCommand
: APPENDCOL LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS
: APPENDCOL (OVERRIDE EQUAL booleanLiteral)? LT_SQR_PRTHS commands (PIPE commands)* RT_SQR_PRTHS
;

kmeansCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
Expand All @@ -13,6 +14,7 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.EqualTo;
import org.apache.spark.sql.catalyst.expressions.Equality$;
Expand All @@ -33,7 +35,9 @@
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$;
import org.apache.spark.sql.catalyst.plans.logical.Project$;
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias$;
import org.apache.spark.sql.execution.CommandExecutionMode;
import org.apache.spark.sql.execution.ExplainMode;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.command.DescribeTableCommand;
import org.apache.spark.sql.execution.command.ExplainCommand;
import org.apache.spark.sql.types.DataTypes;
Expand Down Expand Up @@ -100,6 +104,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.emptyList;
import static java.util.List.of;
Expand Down Expand Up @@ -147,7 +152,11 @@ public LogicalPlan visitFirstChild(Node node, CatalystPlanContext context) {
@Override
public LogicalPlan visitExplain(Explain node, CatalystPlanContext context) {
node.getStatement().accept(this, context);
return context.apply(p -> new ExplainCommand(p, ExplainMode.fromString(node.getExplainMode().name())));
context.apply(p -> new ExplainCommand(p, ExplainMode.fromString(node.getExplainMode().name())));
System.out.println(context.getPlan());
Seq<Attribute> output = context.getPlan().output();
System.out.println(output);
return context.getPlan();
}

@Override
Expand Down Expand Up @@ -296,6 +305,14 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) {
context.retainAllNamedParseExpressions(p -> p);
context.retainAllPlans(p -> p);

SparkSession sparkSession = SparkSession.getActiveSession().get();

QueryExecution queryExecution = sparkSession.sessionState().executePlan(mainSearchWithRowNumber, CommandExecutionMode.ALL());
QueryExecution queryExecutionSub = sparkSession.sessionState().executePlan(subSearchWithRowNumber, CommandExecutionMode.ALL());

Seq<Attribute> outputMain = queryExecution.analyzed().output();
Seq<Attribute> outputSub = queryExecutionSub.analyzed().output();

// Composite the join clause
LogicalPlan joinedQuery = join(
mainSearchWithRowNumber, subSearchWithRowNumber,
Expand All @@ -306,6 +323,9 @@ public LogicalPlan visitAppendCol(AppendCol node, CatalystPlanContext context) {
// Remove the APPEND_ID
return new DataFrameDropColumns(fieldsToRemove, joinedQuery);
});

System.out.println("Attributes: ");
System.out.println(context.getPlan().output());
return context.getPlan();
}

Expand Down Expand Up @@ -335,6 +355,12 @@ private static Relation retrieveRelationClause(Node node) {
// NPE will be thrown by some node.getChild() call.
break;
}
/*
if (node == null || node.getChild() == null || node.getChild().isEmpty()) {
break;
}
node = node.getChild().get(0);
*/
}
}
return null;
Expand Down
Loading

0 comments on commit 25857f2

Please sign in to comment.