Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor CatalystQueryPlanVisitor into distinct Plan & Expression visitors #852

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class AstBuilder extends OpenSearchPPLParserBaseVisitor<UnresolvedPlan> {
*/
private String query;

public AstBuilder(AstExpressionBuilder expressionBuilder, String query) {
this.expressionBuilder = expressionBuilder;
public AstBuilder(String query) {
this.expressionBuilder = new AstExpressionBuilder(this);
this.query = query;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
*/
public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<UnresolvedExpression> {

private static final int DEFAULT_TAKE_FUNCTION_SIZE_VALUE = 10;
/**
* The function name mapping between fronted and core engine.
*/
Expand All @@ -79,16 +78,10 @@ public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<Unresol
.build();
private AstBuilder astBuilder;

public AstExpressionBuilder() {
}

/**
* Set AstBuilder back to AstExpressionBuilder for resolving the subquery plan in subquery expression
*/
public void setAstBuilder(AstBuilder astBuilder) {
public AstExpressionBuilder(AstBuilder astBuilder) {
this.astBuilder = astBuilder;
}

@Override
public UnresolvedExpression visitMappingCompareExpr(OpenSearchPPLParser.MappingCompareExprContext ctx) {
return new Compare(ctx.comparisonOperator().getText(), visit(ctx.left), visit(ctx.right));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ public StatementBuilderContext getContext() {
}

public static class StatementBuilderContext {
public static final int FETCH_SIZE = 1000;
private int fetchSize;

public StatementBuilderContext(int fetchSize) {
this.fetchSize = fetchSize;
}

public static StatementBuilderContext builder() {
//todo set the default statement builder init params configurable
return new StatementBuilderContext(1000);
return new StatementBuilderContext(FETCH_SIZE);
}

public int getFetchSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.DataType;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

import java.util.List;
import java.util.Optional;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static scala.Option.empty;
Expand All @@ -27,7 +25,7 @@
*
* @return
*/
public interface AggregatorTranslator {
public interface AggregatorTransformer {

static Expression aggregator(org.opensearch.sql.ast.expression.AggregateFunction aggregateFunction, Expression arg) {
if (BuiltinFunctionName.ofAggregation(aggregateFunction.getFuncName()).isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static scala.Option.empty;

public interface BuiltinFunctionTranslator {
public interface BuiltinFunctionTransformer {

/**
* The name mapping between PPL builtin functions to Spark builtin functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.apache.spark.sql.catalyst.plans.logical.Union;
import org.apache.spark.sql.types.DataTypes;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor.ExpressionAnalyzer;
import scala.collection.Seq;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
Expand All @@ -38,7 +38,7 @@ public interface DedupeTransformer {
static LogicalPlan retainOneDuplicateEventAndKeepEmpty(
Dedupe node,
Seq<Attribute> dedupeFields,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
context.apply(p -> {
Expression isNullExpr = buildIsNullFilterExpression(node, expressionAnalyzer, context);
Expand All @@ -63,7 +63,7 @@ static LogicalPlan retainOneDuplicateEventAndKeepEmpty(
static LogicalPlan retainOneDuplicateEvent(
Dedupe node,
Seq<Attribute> dedupeFields,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, expressionAnalyzer, context);
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Filter(isNotNullExpr, p));
Expand All @@ -87,7 +87,7 @@ static LogicalPlan retainOneDuplicateEvent(
static LogicalPlan retainMultipleDuplicateEventsAndKeepEmpty(
Dedupe node,
Integer allowedDuplication,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
context.apply(p -> {
// Build isnull Filter for right
Expand Down Expand Up @@ -137,7 +137,7 @@ static LogicalPlan retainMultipleDuplicateEventsAndKeepEmpty(
static LogicalPlan retainMultipleDuplicateEvents(
Dedupe node,
Integer allowedDuplication,
ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
// Build isnotnull Filter
Expression isNotNullExpr = buildIsNotNullFilterExpression(node, expressionAnalyzer, context);
Expand All @@ -163,7 +163,7 @@ static LogicalPlan retainMultipleDuplicateEvents(
return context.apply(p -> new DataFrameDropColumns(seq(rowNumber.toAttribute()), p));
}

private static Expression buildIsNotNullFilterExpression(Dedupe node, ExpressionAnalyzer expressionAnalyzer, CatalystPlanContext context) {
private static Expression buildIsNotNullFilterExpression(Dedupe node, CatalystExpressionVisitor expressionAnalyzer, CatalystPlanContext context) {
node.getFields().forEach(field -> expressionAnalyzer.analyze(field, context));
Seq<Expression> isNotNullExpressions =
context.retainAllNamedParseExpressions(
Expand All @@ -180,7 +180,7 @@ private static Expression buildIsNotNullFilterExpression(Dedupe node, Expression
return isNotNullExpr;
}

private static Expression buildIsNullFilterExpression(Dedupe node, ExpressionAnalyzer expressionAnalyzer, CatalystPlanContext context) {
private static Expression buildIsNullFilterExpression(Dedupe node, CatalystExpressionVisitor expressionAnalyzer, CatalystPlanContext context) {
node.getFields().forEach(field -> expressionAnalyzer.analyze(field, context));
Seq<Expression> isNullExpressions =
context.retainAllNamedParseExpressions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import org.opensearch.sql.ppl.CatalystQueryPlanVisitor;
import scala.Option;
Expand All @@ -32,7 +33,7 @@ public interface LookupTransformer {
/** lookup mapping fields + input fields*/
static List<NamedExpression> buildLookupRelationProjectList(
Lookup node,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
List<Field> inputFields = new ArrayList<>(node.getInputFieldList());
if (inputFields.isEmpty()) {
Expand All @@ -45,7 +46,7 @@ static List<NamedExpression> buildLookupRelationProjectList(

static List<NamedExpression> buildProjectListFromFields(
List<Field> fields,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
return fields.stream().map(field -> expressionAnalyzer.visitField(field, context))
.map(NamedExpression.class::cast)
Expand All @@ -54,7 +55,7 @@ static List<NamedExpression> buildProjectListFromFields(

static Expression buildLookupMappingCondition(
Lookup node,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
// only equi-join conditions are accepted in lookup command
List<Expression> equiConditions = new ArrayList<>();
Expand All @@ -81,7 +82,7 @@ static Expression buildLookupMappingCondition(
static List<NamedExpression> buildOutputProjectList(
Lookup node,
Lookup.OutputStrategy strategy,
CatalystQueryPlanVisitor.ExpressionAnalyzer expressionAnalyzer,
CatalystExpressionVisitor expressionAnalyzer,
CatalystPlanContext context) {
List<NamedExpression> outputProjectList = new ArrayList<>();
for (Map.Entry<Alias, Field> entry : node.getOutputCandidateMap().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.RegExpExtract;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.Field;
Expand All @@ -27,7 +26,7 @@
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;

public interface ParseStrategy {
public interface ParseTransformer {
/**
* transform the parse/grok/patterns command into a standard catalyst RegExpExtract expression
* Since spark's RegExpExtract cant accept actual regExp group name we need to translate the group's name into its corresponding index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ class PPLSyntaxParser extends Parser {

object PlaneUtils {
def plan(parser: PPLSyntaxParser, query: String): Statement = {
val astExpressionBuilder = new AstExpressionBuilder()
val astBuilder = new AstBuilder(astExpressionBuilder, query)
astExpressionBuilder.setAstBuilder(astBuilder)
val builder =
new AstStatementBuilder(astBuilder, AstStatementBuilder.StatementBuilderContext.builder())
builder.visit(parser.parse(query))
new AstStatementBuilder(
new AstBuilder(query),
AstStatementBuilder.StatementBuilderContext.builder()).visit(parser.parse(query))
}
}
Loading