Skip to content

Commit

Permalink
update correlation command
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 6, 2023
1 parent aee86d6 commit e9d1589
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 32 deletions.
8 changes: 8 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ KMEANS: 'KMEANS';
AD: 'AD';
ML: 'ML';

//CORRELATION KEYWORDS
CORRELATE: 'CORRELATE';
EXACT: 'EXACT';
APPROXIMATE: 'APPROXIMATE';
SCOPE: 'SCOPE';
MAPPING: 'MAPPING';

// COMMAND ASSIST KEYWORDS
AS: 'AS';
BY: 'BY';
Expand Down Expand Up @@ -262,6 +269,7 @@ DAYOFWEEK: 'DAYOFWEEK';
DAYOFYEAR: 'DAYOFYEAR';
DAY_OF_MONTH: 'DAY_OF_MONTH';
DAY_OF_WEEK: 'DAY_OF_WEEK';
DURATION: 'DURATION';
EXTRACT: 'EXTRACT';
FROM_DAYS: 'FROM_DAYS';
FROM_UNIXTIME: 'FROM_UNIXTIME';
Expand Down
23 changes: 23 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pplCommands

commands
: whereCommand
| correlateCommand
| fieldsCommand
| renameCommand
| statsCommand
Expand Down Expand Up @@ -68,6 +69,27 @@ whereCommand
: WHERE logicalExpression
;

correlateCommand
: CORRELATE correlationType FIELDS LT_PRTHS fieldList RT_PRTHS scopeClause (mappingList)?
;

correlationType
: EXACT
| APPROXIMATE
;

scopeClause
: SCOPE LT_PRTHS fieldExpression COMMA value = literalValue (unit = timespanUnit)? RT_PRTHS
;

mappingList
: MAPPING LT_PRTHS ( mappingClause (COMMA mappingClause)* ) RT_PRTHS
;

mappingClause
: qualifiedName EQUAL qualifiedName
;

fieldsCommand
: FIELDS (PLUS | MINUS)? fieldList
;
Expand Down Expand Up @@ -820,6 +842,7 @@ keywordsCanBeId
| SHOW
| FROM
| WHERE
| CORRELATE
| FIELDS
| RENAME
| STATS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand Down Expand Up @@ -94,6 +95,10 @@ public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}

public T visitCorrelation(Correlation node, C context) {
return visitChildren(node, context);
}

public T visitProject(Project node, C context) {
return visitChildren(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opensearch.sql.ast.tree;

import org.opensearch.flint.spark.ppl.OpenSearchPPLParser;
import org.opensearch.sql.ast.AbstractNodeVisitor;

import java.util.List;

/** Logical plan node of correlation , the interface for building the searching sources. */

public class Correlation extends UnresolvedPlan {
private final CorrelationType correlationTypeContext;
private final List<OpenSearchPPLParser.FieldExpressionContext> fieldExpression;
private final OpenSearchPPLParser.ScopeClauseContext contextParamContext;
private final OpenSearchPPLParser.MappingListContext mappingListContext;
private UnresolvedPlan child;
public Correlation(OpenSearchPPLParser.CorrelationTypeContext correlationTypeContext, OpenSearchPPLParser.FieldListContext fieldListContext, OpenSearchPPLParser.ScopeClauseContext contextParamContext, OpenSearchPPLParser.MappingListContext mappingListContext) {
this.correlationTypeContext = CorrelationType.valueOf(correlationTypeContext.getText());
this.fieldExpression = fieldListContext.fieldExpression();
this.contextParamContext = contextParamContext;
this.mappingListContext = mappingListContext;
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCorrelation(this, context);
}

@Override
public Correlation attach(UnresolvedPlan child) {
this.child = child;
return this;
}

enum CorrelationType {
exact,
approximate
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.sql.ast.expression.UnresolvedArgument;
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Correlation;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand Down Expand Up @@ -99,6 +100,11 @@ public UnresolvedPlan visitWhereCommand(OpenSearchPPLParser.WhereCommandContext
return new Filter(internalVisitExpression(ctx.logicalExpression()));
}

@Override
public UnresolvedPlan visitCorrelateCommand(OpenSearchPPLParser.CorrelateCommandContext ctx) {
return new Correlation(ctx.correlationType(),ctx.fieldList(),ctx.scopeClause(),ctx.mappingList());
}

/** Fields command. */
@Override
public UnresolvedPlan visitFieldsCommand(OpenSearchPPLParser.FieldsCommandContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import org.junit.Assert.assertEquals
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, EqualTo, Floor, Literal, Multiply, SortOrder, TimeWindow}
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Divide, EqualTo, Floor, GreaterThanOrEqual, Literal, Multiply, SortOrder, TimeWindow}
import org.apache.spark.sql.catalyst.plans.logical._

class PPLLogicalPlanAggregationQueriesTranslatorTestSuite
Expand Down Expand Up @@ -298,5 +297,77 @@ class PPLLogicalPlanAggregationQueriesTranslatorTestSuite
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logPlan))
}
test("create ppl query count status amount by day window and group by status test") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(
plan(
pplParser,
"source = table | stats sum(status) by span(@timestamp, 1d) as status_count_by_day, status | head 100",
false),
context)
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val status = Alias(UnresolvedAttribute("status"), "status")()
val statusAmount = UnresolvedAttribute("status")
val table = UnresolvedRelation(Seq("table"))

val windowExpression = Alias(
TimeWindow(
UnresolvedAttribute("`@timestamp`"),
TimeWindow.parseExpression(Literal("1 day")),
TimeWindow.parseExpression(Literal("1 day")),
0),
"status_count_by_day")()

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(statusAmount), isDistinct = false),
"sum(status)")()
val aggregatePlan = Aggregate(
Seq(status, windowExpression),
Seq(aggregateExpressions, status, windowExpression),
table)
val planWithLimit = GlobalLimit(Literal(100), LocalLimit(Literal(100), aggregatePlan))
val expectedPlan = Project(star, planWithLimit)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logPlan))
}
test("create ppl query count only error (status >= 400) status amount by day window and group by status test") {
val context = new CatalystPlanContext
val logPlan = planTrnasformer.visit(
plan(
pplParser,
"source = table | where status >= 400 | stats sum(status) by span(@timestamp, 1d) as status_count_by_day, status | head 100",
false),
context)
// Define the expected logical plan
val star = Seq(UnresolvedStar(None))
val statusAlias = Alias(UnresolvedAttribute("status"), "status")()
val statusField = UnresolvedAttribute("status")
val table = UnresolvedRelation(Seq("table"))

val filterExpr = GreaterThanOrEqual(statusField, Literal(400))
val filterPlan = Filter(filterExpr, table)

val windowExpression = Alias(
TimeWindow(
UnresolvedAttribute("`@timestamp`"),
TimeWindow.parseExpression(Literal("1 day")),
TimeWindow.parseExpression(Literal("1 day")),
0),
"status_count_by_day")()

val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("SUM"), Seq(statusField), isDistinct = false),
"sum(status)")()
val aggregatePlan = Aggregate(
Seq(statusAlias, windowExpression),
Seq(aggregateExpressions, statusAlias, windowExpression), filterPlan)
val planWithLimit = GlobalLimit(Literal(100), LocalLimit(Literal(100), aggregatePlan))
val expectedPlan = Project(star, planWithLimit)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, Literal, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.junit.Assert.assertEquals
import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

class PPLLogicalPlanCorrelationQueriesTranslatorTestSuite
extends SparkFunSuite
with LogicalPlanTestUtils
with Matchers {

private val planTrnasformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()


test("Search multiple tables with correlation - translated into join call with fields") {
val context = new CatalystPlanContext
val query = "source = table1, table2 | where @timestamp=`2018-07-02T22:23:00` AND ip=`10.0.0.1` AND cloud.provider=`aws` | correlate exact fields(ip, port) scope(@timestamp, 1d)" +
" mapping( alb_logs.ip = traces.source_ip, alb_logs.port = metrics.target_port )"
val logPlan = planTrnasformer.visit(plan(pplParser, query, false), context)

val table1 = UnresolvedRelation(Seq("table1"))
val table2 = UnresolvedRelation(Seq("table2"))

val allFields1 = UnresolvedStar(None)
val allFields2 = UnresolvedStar(None)

val projectedTable1 = Project(Seq(allFields1), table1)
val projectedTable2 = Project(Seq(allFields2), table2)

val expectedPlan =
Union(Seq(projectedTable1, projectedTable2), byName = true, allowMissingCol = true)

assertEquals(expectedPlan, logPlan)
}
}

0 comments on commit e9d1589

Please sign in to comment.