Skip to content

Commit

Permalink
cidrmatch ppl command add logical tests and docs (#865)
Browse files Browse the repository at this point in the history
* update logical tests and docs

Signed-off-by: YANGDB <[email protected]>

* update scala fmt style

Signed-off-by: YANGDB <[email protected]>

* fix type error

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored Nov 4, 2024
1 parent bdb4848 commit b183b4a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 1 deletion.
9 changes: 8 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | trendline sma(2, temperature) as temp_trend`

#### **IP related queries**
[See additional command details](functions/ppl-ip.md)

- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')`
- `source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange`

```sql
source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.expression.function.SerializableUdf
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.DataTypes

class PPLLogicalPlanParseCidrmatchTestSuite
extends SparkFunSuite
with PlanTest
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test cidrmatch for ipv4 for 192.168.1.0/24") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("192.168.1.0/24")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(false))
val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(true))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedStar(None)),
Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true and isValid = false and cidrmatch(ipAddress, '2003:db8::/32')"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(false))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedStar(None)),
Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field projected") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true and cidrmatch(ipAddress, '2003:db8::/32') | fields ip"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedAttribute("ip")),
Filter(And(filterIpv6, cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field bool respond for each ip") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val filterClause = Filter(filterIpv6, UnresolvedRelation(Seq("t")))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val equalTo = EqualTo(Literal(true), cidr)
val caseFunction = CaseWhen(Seq((equalTo, Literal("in"))), Literal("out"))
val aliasStatusCategory = Alias(caseFunction, "inRange")()
val evalProjectList = Seq(UnresolvedStar(None), aliasStatusCategory)
val evalProject = Project(evalProjectList, filterClause)

val expectedPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("inRange")), evalProject)

assert(compareByString(expectedPlan) === compareByString(logPlan))
}

}

0 comments on commit b183b4a

Please sign in to comment.