Skip to content

Commit

Permalink
minor refactory & package movement
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Sep 26, 2023
1 parent f665fc5 commit 07529ea
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, EqualTo, LessThan, Literal, Not, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, LessThanOrEqual, Literal, Multiply, Not, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.streaming.StreamTest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark
package org.opensearch.flint.spark.ppl

import java.sql.Timestamp

import org.opensearch.flint.spark.FlintPPLSuite

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, Floor, GenericRowWithSchema, Literal, Multiply, SortOrder, TimeWindow}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,20 @@ public LogicalPlan visitAggregation(Aggregation node, CatalystPlanContext contex

if (!groupExpList.isEmpty()) {
//add group by fields to context
extractedGroupBy(node.getGroupExprList().size(), context);
context.getGroupingParseExpressions().addAll(groupExpList);
}

UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
span.accept(this, context);
//add span's group by field to context
extractedGroupBy(1, context);
//add span's group alias field (most recent added expression)
context.getGroupingParseExpressions().add(context.getNamedParseExpressions().peek());
}
// build the aggregation logical step
extractedAggregation(context);
return child;
}

private static void extractedGroupBy(int groupByElementsCount, CatalystPlanContext context) {
//copy the group by aliases from the namedExpressionList to the groupByExpressionList
for (int i = 1; i <= groupByElementsCount; i++) {
context.getGroupingParseExpressions().add(context.getNamedParseExpressions().get(context.getNamedParseExpressions().size() - i));
}
}


private static void extractedAggregation(CatalystPlanContext context) {
Seq<Expression> groupingExpression = context.retainAllGroupingNamedParseExpressions(p -> p);
Seq<NamedExpression> aggregateExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
Expand All @@ -160,7 +153,7 @@ public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) {
@Override
public LogicalPlan visitProject(Project node, CatalystPlanContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
visitExpressionList(node.getProjectList(), context);
List<Expression> expressionList = visitExpressionList(node.getProjectList(), context);

// Create a projection list from the existing expressions
Seq<?> projectList = seq(context.getNamedParseExpressions());
Expand All @@ -171,7 +164,7 @@ public LogicalPlan visitProject(Project node, CatalystPlanContext context) {
}
if (node.hasArgument()) {
Argument argument = node.getArgExprList().get(0);
//todo exclude the argument from the projected aruments list
//todo exclude the argument from the projected arguments list
}
return child;
}
Expand Down

0 comments on commit 07529ea

Please sign in to comment.