Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
Signed-off-by: currantw <[email protected]>
  • Loading branch information
currantw committed Dec 16, 2024
1 parent 5052ffe commit 0e03dd8
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ source = table | where ispresent(a) |
- `source=accounts | parse email '.+@(?<host>.+)' | stats count() by host`
- `source=accounts | parse email '.+@(?<host>.+)' | eval eval_result=1 | fields host, eval_result`
- `source=accounts | parse email '.+@(?<host>.+)' | where age > 45 | sort - age | fields age, email, host`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street`
- `source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort streetNumber | fields streetNumber, street`
- Limitation: [see limitations](ppl-parse-command.md#limitations)

#### **Grok**
Expand Down
2 changes: 1 addition & 1 deletion docs/ppl-lang/ppl-parse-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ The example shows how to sort street numbers that are higher than 500 in ``addre

PPL query:

os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort num(streetNumber) | fields streetNumber, street ;
os> source=accounts | parse address '(?<streetNumber>\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort streetNumber | fields streetNumber, street ;
fetched rows / total rows = 3/3
+----------------+----------------+
| streetNumber | street |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class FlintSparkPPLParseITSuite

test("test parse email & host expressions including cast and sort commands") {
val frame = sql(s"""
| source = $testTable| parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street
| source = $testTable| parse street_address '(?<streetNumber>\\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort streetNumber | fields streetNumber, street
| """.stripMargin)
// Retrieve the results
val results: Array[Row] = frame.collect()
Expand Down
7 changes: 0 additions & 7 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,6 @@ DATASOURCES: 'DATASOURCES';
USING: 'USING';
WITH: 'WITH';

// FIELD KEYWORDS
AUTO: 'AUTO';
STR: 'STR';
IP: 'IP';
NUM: 'NUM';


// FIELDSUMMARY keywords
FIELDSUMMARY: 'FIELDSUMMARY';
INCLUDEFIELDS: 'INCLUDEFIELDS';
Expand Down
14 changes: 1 addition & 13 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,7 @@ wcFieldList
;

sortField
: (PLUS | MINUS)? sortFieldExpression
;

sortFieldExpression
: fieldExpression
| AUTO LT_PRTHS fieldExpression RT_PRTHS
| STR LT_PRTHS fieldExpression RT_PRTHS
| IP LT_PRTHS fieldExpression RT_PRTHS
| NUM LT_PRTHS fieldExpression RT_PRTHS
: (PLUS | MINUS)? fieldExpression
;

fieldExpression
Expand Down Expand Up @@ -1095,10 +1087,6 @@ keywordsCanBeId
| INDEX
| DESC
| DATASOURCES
| AUTO
| STR
| IP
| NUM
| FROM
| PATTERN
| NEW_FIELD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
import org.opensearch.sql.ast.expression.subquery.InSubquery;
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.ppl.utils.ArgumentFactory;

Expand All @@ -58,7 +56,6 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.opensearch.sql.expression.function.BuiltinFunctionName.EQUAL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NOT_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IS_NULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.LENGTH;
Expand All @@ -80,7 +77,7 @@ public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<Unresol
.put("isnotnull", IS_NOT_NULL.getName().getFunctionName())
.put("ispresent", IS_NOT_NULL.getName().getFunctionName())
.build();
private AstBuilder astBuilder;
private final AstBuilder astBuilder;

public AstExpressionBuilder(AstBuilder astBuilder) {
this.astBuilder = astBuilder;
Expand Down Expand Up @@ -184,7 +181,7 @@ public UnresolvedExpression visitWcFieldExpression(OpenSearchPPLParser.WcFieldEx
@Override
public UnresolvedExpression visitSortField(OpenSearchPPLParser.SortFieldContext ctx) {
return new Field((QualifiedName)
visit(ctx.sortFieldExpression().fieldExpression().qualifiedName()),
visit(ctx.fieldExpression().qualifiedName()),
ArgumentFactory.getArgumentList(ctx));
}

Expand Down Expand Up @@ -263,7 +260,6 @@ public UnresolvedExpression visitCaseExpr(OpenSearchPPLParser.CaseExprContext ct
public UnresolvedExpression visitIsEmptyExpression(OpenSearchPPLParser.IsEmptyExpressionContext ctx) {
Function trimFunction = new Function(TRIM.getName().getFunctionName(), Collections.singletonList(this.visitFunctionArg(ctx.functionArg())));
Function lengthFunction = new Function(LENGTH.getName().getFunctionName(), Collections.singletonList(trimFunction));
Compare lengthEqualsZero = new Compare(EQUAL.getName().getFunctionName(), lengthFunction, new Literal(0, DataType.INTEGER));
Literal whenCompareValue = new Literal(0, DataType.INTEGER);
Literal isEmptyFalse = new Literal(false, DataType.BOOLEAN);
Literal isEmptyTrue = new Literal(true, DataType.BOOLEAN);
Expand Down Expand Up @@ -452,12 +448,10 @@ public UnresolvedExpression visitLambda(OpenSearchPPLParser.LambdaContext ctx) {

private List<UnresolvedExpression> timestampFunctionArguments(
OpenSearchPPLParser.TimestampFunctionCallContext ctx) {
List<UnresolvedExpression> args =
Arrays.asList(
return Arrays.asList(
new Literal(ctx.timestampFunction().simpleDateTimePart().getText(), DataType.STRING),
visitFunctionArg(ctx.timestampFunction().firstArg),
visitFunctionArg(ctx.timestampFunction().secondArg));
return args;
}

private QualifiedName visitIdentifiers(List<? extends ParserRuleContext> ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,7 @@ public static List<Argument> getArgumentList(OpenSearchPPLParser.SortFieldContex
return Arrays.asList(
ctx.MINUS() != null
? new Argument("asc", new Literal(false, DataType.BOOLEAN))
: new Argument("asc", new Literal(true, DataType.BOOLEAN)),
ctx.sortFieldExpression().AUTO() != null
? new Argument("type", new Literal("auto", DataType.STRING))
: ctx.sortFieldExpression().IP() != null
? new Argument("type", new Literal("ip", DataType.STRING))
: ctx.sortFieldExpression().NUM() != null
? new Argument("type", new Literal("num", DataType.STRING))
: ctx.sortFieldExpression().STR() != null
? new Argument("type", new Literal("str", DataType.STRING))
: new Argument("type", new Literal(null, DataType.NULL)));
: new Argument("asc", new Literal(true, DataType.BOOLEAN)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import org.scalatest.matchers.should.Matchers
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.ScalaReflection.universe.Star
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Cast, Coalesce, Descending, GreaterThan, Literal, NamedExpression, NullsFirst, NullsLast, RegExpExtract, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, Project, Sort}
import org.apache.spark.sql.types.{DataType, IntegerType}

class PPLLogicalPlanParseTranslatorTestSuite
extends SparkFunSuite
Expand Down Expand Up @@ -120,13 +121,13 @@ class PPLLogicalPlanParseTranslatorTestSuite
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test parse email & host expressions including cast and sort commands") {
test("test parse street number & address expressions including cast and sort commands") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | parse address '(?<streetNumber>\\d+) (?<street>.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street"),
"source=t | parse address '(?<streetNumber>\\d+) (?<street>.+)' | where cast(streetNumber as int) > 500 | sort streetNumber | fields streetNumber, street"),
context)

val addressAttribute = UnresolvedAttribute("address")
Expand All @@ -147,13 +148,15 @@ class PPLLogicalPlanParseTranslatorTestSuite
Literal("2")),
"street")()

val castExpression = Cast(streetNumberAttribute, IntegerType)

val expectedPlan = Project(
Seq(streetNumberAttribute, streetAttribute),
Sort(
Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)),
global = true,
Filter(
GreaterThan(streetNumberAttribute, Literal(500)),
GreaterThan(castExpression, Literal(500)),
Project(
Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)),
UnresolvedRelation(Seq("t"))))))
Expand Down

0 comments on commit 0e03dd8

Please sign in to comment.