Skip to content

Commit

Permalink
Fillnull command introduced.
Browse files Browse the repository at this point in the history
Signed-off-by: Lukasz Soszynski <[email protected]>
  • Loading branch information
lukasz-soszynski-eliatra committed Oct 7, 2024
1 parent 7649f9b commit 735eb61
Show file tree
Hide file tree
Showing 10 changed files with 512 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).
- [`dedup command `](ppl-dedup-command.md)

- [`describe command`](PPL-Example-Commands.md/#describe)

- [`fillnull command`](ppl-fillnull-command.md)

- [`eval command`](ppl-eval-command.md)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,4 +619,27 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| (6, 403, '/home', '2023-10-01 10:25:00')
| """.stripMargin)
}

protected def createNullableTableHttpLog(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
|(
| id INT,
| status_code INT,
| request_path STRING,
| timestamp STRING
|)
| USING $tableType $tableOptions
|""".stripMargin)

sql(s"""
| INSERT INTO $testTable
| VALUES (1, 200, '/home', null),
| (2, null, '/about', '2023-10-01 10:05:00'),
| (3, null, '/contact', '2023-10-01 10:10:00'),
| (4, 301, null, '2023-10-01 10:15:00'),
| (5, 200, null, '2023-10-01 10:20:00'),
| (6, 403, '/home', null)
| """.stripMargin)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLFillnullITSuite
extends QueryTest
with LogicalPlanTestUtils
with FlintPPLSuite
with StreamTest {

private val testTable = "spark_catalog.default.flint_ppl_test"

override def beforeAll(): Unit = {
super.beforeAll()

// Create test table
createNullableTableHttpLog(testTable)
}

protected override def afterEach(): Unit = {
super.afterEach()
// Stop all streaming jobs if any
spark.streams.active.foreach { job =>
job.stop()
job.awaitTermination()
}
}

test("test fillnull with one null replacement value and one column") {
val frame = sql(s"""
| source = $testTable | fillnull value = 0 status_code
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 0),
Row(3, "/contact", "2023-10-01 10:10:00", 0),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test fillnull with various null replacement values and one column") {
val frame = sql(s"""
| source = $testTable | fillnull fields status_code=101
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", null, 200),
Row(2, "/about", "2023-10-01 10:05:00", 101),
Row(3, "/contact", "2023-10-01 10:10:00", 101),
Row(4, null, "2023-10-01 10:15:00", 301),
Row(5, null, "2023-10-01 10:20:00", 200),
Row(6, "/home", null, 403))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test fillnull with one null replacement value and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull value = '???' request_path, timestamp | fields id, request_path, timestamp
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", "???"),
Row(2, "/about", "2023-10-01 10:05:00"),
Row(3, "/contact", "2023-10-01 10:10:00"),
Row(4, "???", "2023-10-01 10:15:00"),
Row(5, "???", "2023-10-01 10:20:00"),
Row(6, "/home", "???"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}

test("test fillnull with various null replacement values and two columns") {
val frame = sql(s"""
| source = $testTable | fillnull fields request_path='/not_found', timestamp='*' | fields id, request_path, timestamp
| """.stripMargin)

val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(
Row(1, "/home", "*"),
Row(2, "/about", "2023-10-01 10:05:00"),
Row(3, "/contact", "2023-10-01 10:10:00"),
Row(4, "/not_found", "2023-10-01 10:15:00"),
Row(5, "/not_found", "2023-10-01 10:20:00"),
Row(6, "/home", "*"))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
}
}
2 changes: 2 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ NEW_FIELD: 'NEW_FIELD';
KMEANS: 'KMEANS';
AD: 'AD';
ML: 'ML';
FILLNULL: 'FILLNULL';

//Native JOIN KEYWORDS
JOIN: 'JOIN';
Expand Down Expand Up @@ -72,6 +73,7 @@ INDEX: 'INDEX';
D: 'D';
DESC: 'DESC';
DATASOURCES: 'DATASOURCES';
VALUE: 'VALUE';

// CLAUSE KEYWORDS
SORTBY: 'SORTBY';
Expand Down
24 changes: 24 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ commands
| patternsCommand
| lookupCommand
| renameCommand
| fillnullCommand
;

searchCommand
Expand Down Expand Up @@ -184,6 +185,29 @@ lookupPair
: inputField = fieldExpression (AS outputField = fieldExpression)?
;

fillnullCommand
: FILLNULL (fillNullWithTheSameValue
| fillNullWithFieldVariousValues)
;

fillNullWithTheSameValue
: VALUE EQUAL nullReplacement nullableField (COMMA nullableField)*
;

fillNullWithFieldVariousValues
: FIELDS nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)*
;


nullableField
: fieldExpression
;

nullReplacement
: literalValue
;


kmeansCommand
: KMEANS (kmeansParameter)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.*;

/** AST nodes visitor Defines the traverse path. */
public abstract class AbstractNodeVisitor<T, C> {
Expand Down Expand Up @@ -293,4 +294,7 @@ public T visitExplain(Explain node, C context) {
public T visitInSubquery(InSubquery node, C context) {
return visitChildren(node, context);
}
public T visitFillNull(FillNull fillNull, C context) {
return visitChildren(fillNull, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.opensearch.sql.ast.tree;

import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Literal;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FillNull extends UnresolvedPlan { ;

public static class NullableFieldFill {
private final Field nullableFieldReference;
private final Literal replaceNullWithMe;

public NullableFieldFill(Field nullableFieldReference, Literal replaceNullWithMe) {
this.nullableFieldReference = Objects.requireNonNull(nullableFieldReference, "Field to replace is required");
this.replaceNullWithMe = Objects.requireNonNull(replaceNullWithMe, "Null replacement is required");
}

public Field getNullableFieldReference() {
return nullableFieldReference;
}

public Literal getReplaceNullWithMe() {
return replaceNullWithMe;
}
}

private interface ContainNullableFieldFill {
Stream<NullableFieldFill> getNullFieldFill();
}

public static class SameValueNullFill implements ContainNullableFieldFill {
private final List<NullableFieldFill> replacements;

public SameValueNullFill(Literal replaceNullWithMe, List<Field> nullableFieldReferences) {
Objects.requireNonNull(replaceNullWithMe, "Null replacement is required");
this.replacements = Objects.requireNonNull(nullableFieldReferences, "Nullable field reference is required")
.stream()
.map(nullableReference -> new NullableFieldFill(nullableReference, replaceNullWithMe))
.collect(Collectors.toList());
}

@Override
public Stream<NullableFieldFill> getNullFieldFill() {
return replacements.stream();
}
}

public static class VariousValueNullFill implements ContainNullableFieldFill {
private final List<NullableFieldFill> replacements;

public VariousValueNullFill(List<NullableFieldFill> replacements) {
this.replacements = replacements;
}

@Override
public Stream<NullableFieldFill> getNullFieldFill() {
return replacements.stream();
}
}

private UnresolvedPlan child;
private final SameValueNullFill sameValueNullFill;
private final VariousValueNullFill variousValueNullFill;

public FillNull(SameValueNullFill sameValueNullFill, VariousValueNullFill variousValueNullFill) {
this.sameValueNullFill = sameValueNullFill;
this.variousValueNullFill = variousValueNullFill;
}

public List<NullableFieldFill> getNullableFieldFills() {
return Stream.of(sameValueNullFill, variousValueNullFill)
.filter(Objects::nonNull)
.flatMap(ContainNullableFieldFill::getNullFieldFill)
.collect(Collectors.toList());
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {

return child == null ? List.of() : List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitFillNull(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.spark.sql.catalyst.TableIdentifier;
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$;
import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction;
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$;
import org.apache.spark.sql.catalyst.expressions.Ascending$;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.DescribeRelation;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
Expand All @@ -84,6 +86,7 @@
import scala.Option;
import scala.Option$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq;

import java.util.*;
Expand Down Expand Up @@ -388,6 +391,37 @@ public LogicalPlan visitHead(Head node, CatalystPlanContext context) {
node.getSize(), DataTypes.IntegerType), p));
}

@Override
public LogicalPlan visitFillNull(FillNull fillNull, CatalystPlanContext context) {
fillNull.getChild().get(0).accept(this, context);
List<UnresolvedExpression> aliases = new ArrayList<>();
for(FillNull.NullableFieldFill nullableFieldFill : fillNull.getNullableFieldFills()) {
Field field = nullableFieldFill.getNullableFieldReference();
Literal replaceNullWithMe = nullableFieldFill.getReplaceNullWithMe();
Function coalesce = new Function("coalesce", of(field, replaceNullWithMe));
String fieldName = field.getField().toString();
Alias alias = new Alias(fieldName, coalesce);
aliases.add(alias);
}
if (context.getNamedParseExpressions().isEmpty()) {
// Create an UnresolvedStar for all-fields projection
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
// ((Alias) expressionList.get(0)).child().children().head()
List<Expression> toDrop = visitExpressionList(aliases, context).stream()
.map(org.apache.spark.sql.catalyst.expressions.Alias.class::cast)
.map(org.apache.spark.sql.catalyst.expressions.Alias::child) // coalesce
.map(UnresolvedFunction.class::cast)// coalesce
.map(UnresolvedFunction::children) // Seq of coalesce arguments
.map(IterableLike::head) // first function argument which is source field
.collect(Collectors.toList());
Seq<NamedExpression> projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
// build the plan with the projection step
context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p));
LogicalPlan resultWithoutDuplicatedColumns = context.apply(logicalPlan -> DataFrameDropColumns$.MODULE$.apply(seq(toDrop), logicalPlan));
return Objects.requireNonNull(resultWithoutDuplicatedColumns, "FillNull operation failed");
}

private void visitFieldList(List<Field> fieldList, CatalystPlanContext context) {
fieldList.forEach(field -> visitExpression(field, context));
}
Expand Down Expand Up @@ -694,7 +728,10 @@ public Expression visitIsEmpty(IsEmpty node, CatalystPlanContext context) {
return expression;
}


@Override
public Expression visitFillNull(FillNull fillNull, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : FillNull");
}

@Override
public Expression visitInterval(Interval node, CatalystPlanContext context) {
Expand Down
Loading

0 comments on commit 735eb61

Please sign in to comment.