Skip to content

Commit

Permalink
Refactor CatalystQueryPlanVisitor into distinct Plan & Expression v…
Browse files Browse the repository at this point in the history
…isitors (opensearch-project#852)

* We would like to refactor the CatalystQueryPlanVisitor and separate it into two distinct visitors:

Plan Visitor ( which extends AbstractNodeVisitor<LogicalPlan, CatalystPlanContext> )
Expression Visitor (which extends AbstractNodeVisitor<Expression, CatalystPlanContext>)
This would match the existing PPL AST visitors composition:

AstBuilder ( which extends OpenSearchPPLParserBaseVisitor)
AstExpressionBuilder ( which extends OpenSearchPPLParserBaseVisitor )
In addition unify the ppl utils classes to match one of the following naming:

*Transformer - transforms PPL (logical) expressions into Spark (logical) expressions
*Utils - utility class

Signed-off-by: YANGDB <[email protected]>

* update the AstBuilder ctor

Signed-off-by: YANGDB <[email protected]>

* resolve latest merge conflicts

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Oct 31, 2024
1 parent 99aab5a commit 98962c3
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 387 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class AstBuilder extends OpenSearchPPLParserBaseVisitor<UnresolvedPlan> {
*/
private String query;

public AstBuilder(AstExpressionBuilder expressionBuilder, String query) {
this.expressionBuilder = expressionBuilder;
public AstBuilder(String query) {
this.expressionBuilder = new AstExpressionBuilder(this);
this.query = query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
*/
public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<UnresolvedExpression> {

private static final int DEFAULT_TAKE_FUNCTION_SIZE_VALUE = 10;
/**
* The function name mapping between fronted and core engine.
*/
Expand All @@ -79,16 +78,10 @@ public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<Unresol
.build();
private AstBuilder astBuilder;

public AstExpressionBuilder() {
}

/**
* Set AstBuilder back to AstExpressionBuilder for resolving the subquery plan in subquery expression
*/
public void setAstBuilder(AstBuilder astBuilder) {
public AstExpressionBuilder(AstBuilder astBuilder) {
this.astBuilder = astBuilder;
}

@Override
public UnresolvedExpression visitMappingCompareExpr(OpenSearchPPLParser.MappingCompareExprContext ctx) {
return new Compare(ctx.comparisonOperator().getText(), visit(ctx.left), visit(ctx.right));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public StatementBuilderContext getContext() {
}

public static class StatementBuilderContext {
public static final int FETCH_SIZE = 1000;
private int fetchSize;

public StatementBuilderContext(int fetchSize) {
this.fetchSize = fetchSize;
}

public static StatementBuilderContext builder() {
//todo set the default statement builder init params configurable
return new StatementBuilderContext(1000);
return new StatementBuilderContext(FETCH_SIZE);
}

public int getFetchSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

import java.util.List;
import java.util.Optional;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static scala.Option.empty;
Expand All @@ -27,7 +25,7 @@
*
* @return
*/
public interface AggregatorTranslator {
public interface AggregatorTransformer {

static Expression aggregator(org.opensearch.sql.ast.expression.AggregateFunction aggregateFunction, Expression arg) {
if (BuiltinFunctionName.ofAggregation(aggregateFunction.getFuncName()).isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static scala.Option.empty;

public interface BuiltinFunctionTranslator {
public interface BuiltinFunctionTransformer {

/**
* The name mapping between PPL builtin functions to Spark builtin functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.types.DataTypes;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor.ExpressionAnalyzer;
import scala.collection.Seq;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
Expand All @@ -38,7 +38,7 @@ public interface DedupeTransformer {
static LogicalPlan retainOneDuplicateEventAndKeepEmpty(
Dedupe node,
Seq<Attribute> dedupeFields,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
context.apply(p -> {
Expression isNullExpr = buildIsNullFilterExpression(node, expressionAnalyzer, context);
Expand All @@ -63,7 +63,7 @@ static LogicalPlan retainOneDuplicateEventAndKeepEmpty(
static LogicalPlan retainOneDuplicateEvent(
Dedupe node,
Seq<Attribute> dedupeFields,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, expressionAnalyzer, context);
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p));
Expand All @@ -87,7 +87,7 @@ static LogicalPlan retainOneDuplicateEvent(
static LogicalPlan retainMultipleDuplicateEventsAndKeepEmpty(
Dedupe node,
Integer allowedDuplication,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
context.apply(p -> {
// Build isnull Filter for right
Expand Down Expand Up @@ -137,7 +137,7 @@ static LogicalPlan retainMultipleDuplicateEventsAndKeepEmpty(
static LogicalPlan retainMultipleDuplicateEvents(
Dedupe node,
Integer allowedDuplication,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
// Build isnotnull Filter
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, expressionAnalyzer, context);
Expand All @@ -163,7 +163,7 @@ static LogicalPlan retainMultipleDuplicateEvents(
return context.apply(p -> new DataFrameDropColumns(seq(rowNumber.toAttribute()), p));
}

private static Expression buildIsNotNullFilterExpression(Dedupe node, ExpressionAnalyzer expressionAnalyzer, CatalystPlanContext context) {
private static Expression buildIsNotNullFilterExpression(Dedupe node, CatalystExpressionVisitor expressionAnalyzer, CatalystPlanContext context) {
node.getFields().forEach(field -> expressionAnalyzer.analyze(field, context));
Seq<Expression> isNotNullExpressions =
context.retainAllNamedParseExpressions(
Expand All @@ -180,7 +180,7 @@ private static Expression buildIsNotNullFilterExpression(Dedupe node, Expression
return isNotNullExpr;
}

private static Expression buildIsNullFilterExpression(Dedupe node, ExpressionAnalyzer expressionAnalyzer, CatalystPlanContext context) {
private static Expression buildIsNullFilterExpression(Dedupe node, CatalystExpressionVisitor expressionAnalyzer, CatalystPlanContext context) {
node.getFields().forEach(field -> expressionAnalyzer.analyze(field, context));
Seq<Expression> isNullExpressions =
context.retainAllNamedParseExpressions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor;
import scala.Option;
Expand All @@ -32,7 +33,7 @@ public interface LookupTransformer {
/** lookup mapping fields + input fields*/
static List<NamedExpression> buildLookupRelationProjectList(
Lookup node,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
List<Field> inputFields = new ArrayList<>(node.getInputFieldList());
if (inputFields.isEmpty()) {
Expand All @@ -45,7 +46,7 @@ static List<NamedExpression> buildLookupRelationProjectList(

static List<NamedExpression> buildProjectListFromFields(
List<Field> fields,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
return fields.stream().map(field -> expressionAnalyzer.visitField(field, context))
.map(NamedExpression.class::cast)
Expand All @@ -54,7 +55,7 @@ static List<NamedExpression> buildProjectListFromFields(

static Expression buildLookupMappingCondition(
Lookup node,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
// only equi-join conditions are accepted in lookup command
List<Expression> equiConditions = new ArrayList<>();
Expand All @@ -81,7 +82,7 @@ static Expression buildLookupMappingCondition(
static List<NamedExpression> buildOutputProjectList(
Lookup node,
Lookup.OutputStrategy strategy,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
List<NamedExpression> outputProjectList = new ArrayList<>();
for (Map.Entry<Alias, Field> entry : node.getOutputCandidateMap().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.RegExpExtract;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.Field;
Expand All @@ -27,7 +26,7 @@
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;

public interface ParseStrategy {
public interface ParseTransformer {
/**
* transform the parse/grok/patterns command into a standard catalyst RegExpExtract expression
* Since spark's RegExpExtract cant accept actual regExp group name we need to translate the group's name into its corresponding index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ class PPLSyntaxParser extends Parser {

object PlaneUtils {
def plan(parser: PPLSyntaxParser, query: String): Statement = {
val astExpressionBuilder = new AstExpressionBuilder()
val astBuilder = new AstBuilder(astExpressionBuilder, query)
astExpressionBuilder.setAstBuilder(astBuilder)
val builder =
new AstStatementBuilder(astBuilder, AstStatementBuilder.StatementBuilderContext.builder())
builder.visit(parser.parse(query))
new AstStatementBuilder(
new AstBuilder(query),
AstStatementBuilder.StatementBuilderContext.builder()).visit(parser.parse(query))
}
}

0 comments on commit 98962c3

Please sign in to comment.