Skip to content

Commit

Permalink
New syntax applied to the fillnull command
Browse files Browse the repository at this point in the history
Signed-off-by: Lukasz Soszynski <[email protected]>
  • Loading branch information
lukasz-soszynski-eliatra committed Oct 8, 2024
1 parent 8151931 commit fed7466
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 34 deletions.
8 changes: 8 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ Assumptions: `a`, `b`, `c` are existing fields in `table`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else 'unknown')`
- `source = table | eval f = case(a = 0, 'zero', a = 1, 'one' else concat(a, ' is an incorrect binary digit'))`

#### Fillnull
Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
- `source = table | fillnull with 0 in a`
- `source = table | fillnull with 'N/A' in a, b, c`
- `source = table | fillnull with concat(a, b) in c, d`
- `source = table | fillnull using a = 101`
- `source = table | fillnull using a = 101, b = 102`
- `source = table | fillnull using a = concat(b, c), d = 2 * pi() * e`

```sql
source = table | eval e = eval status_category =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, LogicalPlan, Project, Sort, UnaryNode}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Expression, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DataFrameDropColumns, LogicalPlan, Project, Sort}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLFillnullITSuite
Expand Down Expand Up @@ -38,7 +38,7 @@ class FlintSparkPPLFillnullITSuite

test("test fillnull with one null replacement value and one column") {
val frame = sql(s"""
| source = $testTable | fillnull value = 0 status_code
| source = $testTable | fillnull with 0 in status_code
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
Expand All @@ -57,13 +57,13 @@ class FlintSparkPPLFillnullITSuite

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", 0)))
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(0))))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with various null replacement values and one column") {
val frame = sql(s"""
| source = $testTable | fillnull fields status_code=101
| source = $testTable | fillnull using status_code=101
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp", "status_code")))
Expand All @@ -82,13 +82,13 @@ class FlintSparkPPLFillnullITSuite

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", 101)))
val expectedPlan = fillNullExpectedPlan(Seq(("status_code", Literal(101))))
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp
| source = $testTable | fillnull with concat('??', '?') in request_path, timestamp | fields id, request_path, timestamp
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp")))
Expand All @@ -108,7 +108,13 @@ class FlintSparkPPLFillnullITSuite
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(("request_path", "???"), ("timestamp", "???")),
Seq(
(
"request_path",
UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false)),
(
"timestamp",
UnresolvedFunction("concat", Seq(Literal("??"), Literal("?")), isDistinct = false))),
addDefaultProject = false)
val expectedPlan = Project(
Seq(
Expand All @@ -121,7 +127,7 @@ class FlintSparkPPLFillnullITSuite

test("test fillnull with various null replacement values and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp
| source = $testTable | fillnull using request_path=upper('/not_found'), timestamp='*' | fields id, request_path, timestamp
| """.stripMargin)

assert(frame.columns.sameElements(Array("id", "request_path", "timestamp")))
Expand All @@ -131,8 +137,8 @@ class FlintSparkPPLFillnullITSuite
Row(1, "/home", "*"),
Row(2, "/about", "2023-10-01 10:05:00"),
Row(3, "/contact", "2023-10-01 10:10:00"),
Row(4, "/not_found", "2023-10-01 10:15:00"),
Row(5, "/not_found", "2023-10-01 10:20:00"),
Row(4, "/NOT_FOUND", "2023-10-01 10:15:00"),
Row(5, "/NOT_FOUND", "2023-10-01 10:20:00"),
Row(6, "/home", "*"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
Expand All @@ -141,7 +147,11 @@ class FlintSparkPPLFillnullITSuite
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(("request_path", "/not_found"), ("timestamp", "*")),
Seq(
(
"request_path",
UnresolvedFunction("upper", Seq(Literal("/not_found")), isDistinct = false)),
("timestamp", Literal("*"))),
addDefaultProject = false)
val expectedPlan = Project(
Seq(
Expand All @@ -154,7 +164,7 @@ class FlintSparkPPLFillnullITSuite

test("test fillnull with one null replacement value and stats and sort command") {
val frame = sql(s"""
| source = $testTable | fillnull value = 500 status_code
| source = $testTable | fillnull with 500 in status_code
| | stats count(status_code) by status_code, request_path
| | sort request_path, status_code
| """.stripMargin)
Expand All @@ -174,7 +184,8 @@ class FlintSparkPPLFillnullITSuite

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(Seq(("status_code", 500)), addDefaultProject = false)
val fillNullPlan =
fillNullExpectedPlan(Seq(("status_code", Literal(500))), addDefaultProject = false)
val aggregateExpressions =
Seq(
Alias(
Expand Down Expand Up @@ -203,7 +214,7 @@ class FlintSparkPPLFillnullITSuite

test("test fillnull with various null replacement value and stats and sort command") {
val frame = sql(s"""
| source = $testTable | fillnull fields status_code = 500, request_path = '/home'
| source = $testTable | fillnull using status_code = 500, request_path = '/home'
| | stats count(status_code) by status_code, request_path
| | sort request_path, status_code
| """.stripMargin)
Expand All @@ -222,7 +233,7 @@ class FlintSparkPPLFillnullITSuite

val logicalPlan: LogicalPlan = frame.queryExecution.logical
val fillNullPlan = fillNullExpectedPlan(
Seq(("status_code", 500), ("request_path", "/home")),
Seq(("status_code", Literal(500)), ("request_path", Literal("/home"))),
addDefaultProject = false)
val aggregateExpressions =
Seq(
Expand Down Expand Up @@ -252,30 +263,30 @@ class FlintSparkPPLFillnullITSuite

test("test fillnull with one null replacement value and missing columns") {
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | fillnull value = '!!!'
| source = $testTable | fillnull with '!!!' in
| """.stripMargin))

assert(ex.getMessage().contains("Syntax error "))
}

test("test fillnull with various null replacement values and missing columns") {
val ex = intercept[AnalysisException](sql(s"""
| source = $testTable | fillnull fields
| source = $testTable | fillnull using
| """.stripMargin))

assert(ex.getMessage().contains("Syntax error "))
}

private def fillNullExpectedPlan(
nullReplacements: Seq[(String, Any)],
nullReplacements: Seq[(String, Expression)],
addDefaultProject: Boolean = true): LogicalPlan = {
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val renameProjectList = UnresolvedStar(None) +: nullReplacements.map {
case (nullableColumn, nullReplacement) =>
Alias(
UnresolvedFunction(
"coalesce",
Seq(UnresolvedAttribute(nullableColumn), Literal(nullReplacement)),
Seq(UnresolvedAttribute(nullableColumn), nullReplacement),
isDistinct = false),
nullableColumn)()
}
Expand Down
2 changes: 2 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ D: 'D';
DESC: 'DESC';
DATASOURCES: 'DATASOURCES';
VALUE: 'VALUE';
USING: 'USING';
WITH: 'WITH';

// CLAUSE KEYWORDS
SORTBY: 'SORTBY';
Expand Down
6 changes: 3 additions & 3 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ fillnullCommand
;

fillNullWithTheSameValue
: VALUE EQUAL nullReplacement nullableField (COMMA nullableField)*
: WITH nullReplacement IN nullableField (COMMA nullableField)*
;

fillNullWithFieldVariousValues
: FIELDS nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)*
: USING nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)*
;


Expand All @@ -204,7 +204,7 @@ fillnullCommand
;

nullReplacement
: literalValue
: expression
;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;
import java.util.Objects;
Expand All @@ -21,7 +21,7 @@ public static class NullableFieldFill {
@NonNull
private final Field nullableFieldReference;
@NonNull
private final Literal replaceNullWithMe;
private final UnresolvedExpression replaceNullWithMe;
}

public interface ContainNullableFieldFill {
Expand All @@ -31,7 +31,7 @@ static ContainNullableFieldFill ofVariousValue(List<NullableFieldFill> replaceme
return new VariousValueNullFill(replacements);
}

static ContainNullableFieldFill ofSameValue(Literal replaceNullWithMe, List<Field> nullableFieldReferences) {
static ContainNullableFieldFill ofSameValue(UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
return new SameValueNullFill(replaceNullWithMe, nullableFieldReferences);
}
}
Expand All @@ -40,7 +40,7 @@ private static class SameValueNullFill implements ContainNullableFieldFill {
@Getter(onMethod_ = @Override)
private final List<NullableFieldFill> nullFieldFill;

public SameValueNullFill(Literal replaceNullWithMe, List<Field> nullableFieldReferences) {
public SameValueNullFill(UnresolvedExpression replaceNullWithMe, List<Field> nullableFieldReferences) {
Objects.requireNonNull(replaceNullWithMe, "Null replacement is required");
this.nullFieldFill = Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required")
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context)
List<UnresolvedExpression> aliases = new ArrayList<>();
for(FillNull.NullableFieldFill nullableFieldFill : fillNull.getNullableFieldFills()) {
Field field = nullableFieldFill.getNullableFieldReference();
Literal replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe();
UnresolvedExpression replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe();
Function coalesce = new Function("coalesce", of(field, replaceNullWithMe));
String fieldName = field.getField().toString();
Alias alias = new Alias(fieldName, coalesce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
FillNullWithFieldVariousValuesContext variousValuesContext = ctx.fillNullWithFieldVariousValues();
if (sameValueContext != null) {
// todo consider using expression instead of Literal
Literal replaceNullWithMe = (Literal) internalVisitExpression(sameValueContext.nullReplacement().literalValue());
UnresolvedExpression replaceNullWithMe = internalVisitExpression(sameValueContext.nullReplacement().expression());
List<Field> fieldsToReplace = sameValueContext.nullableField()
.stream()
.map(this::internalVisitExpression)
Expand All @@ -524,7 +524,7 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
List<NullableFieldFill> nullableFieldFills = IntStream.range(0, variousValuesContext.nullableField().size())
.mapToObj(index -> {
variousValuesContext.nullableField(index);
Literal replaceNullWithMe = (Literal) internalVisitExpression(variousValuesContext.nullReplacement(index).literalValue());
UnresolvedExpression replaceNullWithMe = internalVisitExpression(variousValuesContext.nullReplacement(index).expression());
Field nullableFieldReference = (Field) internalVisitExpression(variousValuesContext.nullableField(index));
return new NullableFieldFill(nullableFieldReference, replaceNullWithMe);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite
planTransformer.visit(
plan(
pplParser,
"source=relation | fillnull value = 'null replacement value' column_name"),
"source=relation | fillnull with 'null replacement value' in column_name"),
context)

val relation = UnresolvedRelation(Seq("relation"))
Expand All @@ -54,13 +54,45 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value, one column and function invocation") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | fillnull with upper(another_field) in column_name"),
context)

val relation = UnresolvedRelation(Seq("relation"))

val renameProjectList: Seq[NamedExpression] =
Seq(
UnresolvedStar(None),
Alias(
UnresolvedFunction(
"coalesce",
Seq(
UnresolvedAttribute("column_name"),
UnresolvedFunction(
"upper",
Seq(UnresolvedAttribute("another_field")),
isDistinct = false)),
isDistinct = false),
"column_name")())
val renameProject = Project(renameProjectList, relation)

val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject)

val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test fillnull with one null replacement value and multiple column") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=relation | fillnull value = 'another null replacement value' column_name_one, column_name_two, column_name_three"),
"source=relation | fillnull with 'another null replacement value' in column_name_one, column_name_two, column_name_three"),
context)

val relation = UnresolvedRelation(Seq("relation"))
Expand Down Expand Up @@ -104,7 +136,7 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(pplParser, "source=relation | fillnull fields column_name='null replacement value'"),
plan(pplParser, "source=relation | fillnull using column_name='null replacement value'"),
context)

val relation = UnresolvedRelation(Seq("relation"))
Expand All @@ -127,13 +159,48 @@ class PPLLogicalPlanFillnullCommandTranslatorTestSuite
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test(
"test fillnull with possibly various null replacement value, one column and function invocation") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=relation | fillnull using column_name=concat('missing value for', id)"),
context)

val relation = UnresolvedRelation(Seq("relation"))

val renameProjectList: Seq[NamedExpression] =
Seq(
UnresolvedStar(None),
Alias(
UnresolvedFunction(
"coalesce",
Seq(
UnresolvedAttribute("column_name"),
UnresolvedFunction(
"concat",
Seq(Literal("missing value for"), UnresolvedAttribute("id")),
isDistinct = false)),
isDistinct = false),
"column_name")())
val renameProject = Project(renameProjectList, relation)

val dropSourceColumn =
DataFrameDropColumns(Seq(UnresolvedAttribute("column_name")), renameProject)

val expectedPlan = Project(seq(UnresolvedStar(None)), dropSourceColumn)
comparePlans(expectedPlan, logPlan, checkAnalysis = false)
}

test("test fillnull with possibly various null replacement value and three columns") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=relation | fillnull fields column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"),
"source=relation | fillnull using column_name_1='null replacement value 1', column_name_2='null replacement value 2', column_name_3='null replacement value 3'"),
context)

val relation = UnresolvedRelation(Seq("relation"))
Expand Down

0 comments on commit fed7466

Please sign in to comment.