Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cidrmatch ppl command add logical tests and docs #865

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ _- **Limitation: new field added by eval command with a function cannot be dropp
- `source = table | where a not in (1, 2, 3) | fields a,b,c`
- `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4]
- `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10'
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where cidrmatch(ipv6, '2003:db8::/32')`
- `source = table | trendline sma(2, temperature) as temp_trend`

#### **IP related queries**
[See additional command details](functions/ppl-ip.md)

- `source = table | where cidrmatch(ip, '192.169.1.0/24')`
- `source = table | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')`
- `source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange`

```sql
source = table | eval status_category =
case(a >= 200 AND a < 300, 'Success',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.ppl

import org.opensearch.flint.spark.ppl.PlaneUtils.plan
import org.opensearch.sql.expression.function.SerializableUdf
import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, ScalaUDF, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.DataTypes

class PPLLogicalPlanParseCidrmatchTestSuite
extends SparkFunSuite
with PlanTest
with LogicalPlanTestUtils
with Matchers {

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()

test("test cidrmatch for ipv4 for 192.168.1.0/24") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("192.168.1.0/24")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(false))
val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(true))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedStar(None)),
Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true and isValid = false and cidrmatch(ipAddress, '2003:db8::/32')"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(false))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedStar(None)),
Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field projected") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true and cidrmatch(ipAddress, '2003:db8::/32') | fields ip"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val expectedPlan = Project(
Seq(UnresolvedAttribute("ip")),
Filter(And(filterIpv6, cidr), UnresolvedRelation(Seq("t"))))
assert(compareByString(expectedPlan) === compareByString(logPlan))
}

test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field bool respond for each ip") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
"source=t | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange"),
context)

val ipAddress = UnresolvedAttribute("ipAddress")
val cidrExpression = Literal("2003:db8::/32")

val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true))
val filterClause = Filter(filterIpv6, UnresolvedRelation(Seq("t")))
val cidr = ScalaUDF(
SerializableUdf.cidrFunction,
DataTypes.BooleanType,
seq(ipAddress, cidrExpression),
seq(),
Option.empty,
Option.apply("cidr"),
false,
true)

val equalTo = EqualTo(Literal(true), cidr)
val caseFunction = CaseWhen(Seq((equalTo, Literal("in"))), Literal("out"))
val aliasStatusCategory = Alias(caseFunction, "inRange")()
val evalProjectList = Seq(UnresolvedStar(None), aliasStatusCategory)
val evalProject = Project(evalProjectList, filterClause)

val expectedPlan =
Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("inRange")), evalProject)

assert(compareByString(expectedPlan) === compareByString(logPlan))
}

}
Loading