Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Kwok <[email protected]>
  • Loading branch information
andy-k-improving committed Nov 13, 2024
1 parent 1071658 commit 648e567
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
4 changes: 2 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ sbt test
```
To run a specific unit test in SBT, use the testOnly command with the full path of the test class:
```
sbt test:testOnly org.opensearch.flint.spark.ppl.PPLLogicalPlanTrendlineCommandTranslatorTestSuite
sbt "; project pplSparkIntegration; test:testOnly org.opensearch.flint.spark.ppl.PPLLogicalPlanTrendlineCommandTranslatorTestSuite"
```


Expand All @@ -30,7 +30,7 @@ If you get integration test failures with error message "Previous attempts to fi

Running only a selected set of integration test suites is possible with the following command:
```
sbt "project integtest" it:testOnly org.opensearch.flint.spark.ppl.FlintSparkPPLTrendlineITSuite
sbt "; project integtest; it:testOnly org.opensearch.flint.spark.ppl.FlintSparkPPLTrendlineITSuite"
```
This command runs only the specified test suite within the integtest submodule.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,6 @@ public enum BuiltinFunctionName {
MULTIMATCHQUERY(FunctionName.of("multimatchquery")),
WILDCARDQUERY(FunctionName.of("wildcardquery")),
WILDCARD_QUERY(FunctionName.of("wildcard_query")),

NTH_VALUE(FunctionName.of("nth_value")),
COALESCE(FunctionName.of("coalesce"));

private FunctionName name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.analysis.UnresolvedFunction;
import org.apache.spark.sql.catalyst.expressions.*;
import org.opensearch.sql.ast.expression.*;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.ppl.CatalystExpressionVisitor;
import org.opensearch.sql.ppl.CatalystPlanContext;
import scala.collection.mutable.Seq;
import scala.Option;
import scala.Tuple2;

Expand All @@ -22,6 +24,8 @@
import java.util.stream.Collectors;

import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq;
import static scala.Option.empty;
import static scala.collection.JavaConverters.asScalaBufferConverter;

public interface TrendlineCatalystUtils {

Expand Down Expand Up @@ -208,13 +212,14 @@ private static List<Expression> getNthValueAggregations(CatalystExpressionVisito
for (int i = 1; i <= dataPoints; i++) {
// Get the offset parameter
Expression offSetExpression = parseIntToExpression(visitor, context, i);

// Composite the nth_value expression.
Function func = new Function(BuiltinFunctionName.NTH_VALUE.name(),
List.of(node.getDataField(), new Literal(i, DataType.INTEGER)));

visitor.visitFunction(func, context);
Expression nthValueExp = context.popNamedParseExpressions().get();
// Get the dataField in Expression
visitor.analyze(node.getDataField(), context);
Expression dataField = context.popNamedParseExpressions().get();
// nth_value Expression
UnresolvedFunction nthValueExp = new UnresolvedFunction(
asScalaBufferConverter(List.of("nth_value")).asScala().seq(),
asScalaBufferConverter(List.of(dataField, offSetExpression)).asScala().seq(),
false, empty(), false);

expressions.add(new Multiply(
new WindowExpression(nthValueExp, windowDefinition), offSetExpression));
Expand Down

0 comments on commit 648e567

Please sign in to comment.