From b183b4a297cb6cdb1cdc55b2b7047cad56b63a05 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 4 Nov 2024 09:54:30 -0800 Subject: [PATCH] `cidrmatch` ppl command add logical tests and docs (#865) * update logical tests and docs Signed-off-by: YANGDB * update scala fmt style Signed-off-by: YANGDB * fix type error Signed-off-by: YANGDB --------- Signed-off-by: YANGDB --- docs/ppl-lang/PPL-Example-Commands.md | 9 +- ...PLLogicalPlanParseCidrmatchTestSuite.scala | 155 ++++++++++++++++++ 2 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala diff --git a/docs/ppl-lang/PPL-Example-Commands.md b/docs/ppl-lang/PPL-Example-Commands.md index 409b128c9..e780f688d 100644 --- a/docs/ppl-lang/PPL-Example-Commands.md +++ b/docs/ppl-lang/PPL-Example-Commands.md @@ -58,10 +58,17 @@ _- **Limitation: new field added by eval command with a function cannot be dropp - `source = table | where a not in (1, 2, 3) | fields a,b,c` - `source = table | where a between 1 and 4` - Note: This returns a >= 1 and a <= 4, i.e. [1, 4] - `source = table | where b not between '2024-09-10' and '2025-09-10'` - Note: This returns b >= '2024-09-10' and b <= '2025-09-10' -- `source = table | where cidrmatch(ip, '192.169.1.0/24')` +- `source = table | where cidrmatch(ip, '192.169.1.0/24')` - `source = table | where cidrmatch(ipv6, '2003:db8::/32')` - `source = table | trendline sma(2, temperature) as temp_trend` +#### **IP related queries** +[See additional command details](functions/ppl-ip.md) + +- `source = table | where cidrmatch(ip, '192.169.1.0/24')` +- `source = table | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')` +- `source = table | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange` + ```sql source = table | eval status_category = case(a >= 200 AND a < 300, 'Success', diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala new file mode 100644 index 000000000..213f201cc --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseCidrmatchTestSuite.scala @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.expression.function.SerializableUdf +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, CaseWhen, Descending, EqualTo, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, ScalaUDF, SortOrder} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.DataTypes + +class PPLLogicalPlanParseCidrmatchTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test cidrmatch for ipv4 for 192.168.1.0/24") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | where isV6 = false and isValid = true and cidrmatch(ipAddress, '192.168.1.0/24')"), + context) + + val ipAddress = UnresolvedAttribute("ipAddress") + val cidrExpression = Literal("192.168.1.0/24") + + val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(false)) + val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(true)) + val cidr = ScalaUDF( + SerializableUdf.cidrFunction, + DataTypes.BooleanType, + seq(ipAddress, cidrExpression), + seq(), + Option.empty, + Option.apply("cidr"), + false, + true) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test cidrmatch for ipv6 for 2003:db8::/32") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | where isV6 = true and isValid = false and cidrmatch(ipAddress, '2003:db8::/32')"), + context) + + val ipAddress = UnresolvedAttribute("ipAddress") + val cidrExpression = Literal("2003:db8::/32") + + val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) + val filterIsValid = EqualTo(UnresolvedAttribute("isValid"), Literal(false)) + val cidr = ScalaUDF( + SerializableUdf.cidrFunction, + DataTypes.BooleanType, + seq(ipAddress, cidrExpression), + seq(), + Option.empty, + Option.apply("cidr"), + false, + true) + + val expectedPlan = Project( + Seq(UnresolvedStar(None)), + Filter(And(And(filterIpv6, filterIsValid), cidr), UnresolvedRelation(Seq("t")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field projected") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | where isV6 = true and cidrmatch(ipAddress, '2003:db8::/32') | fields ip"), + context) + + val ipAddress = UnresolvedAttribute("ipAddress") + val cidrExpression = Literal("2003:db8::/32") + + val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) + val cidr = ScalaUDF( + SerializableUdf.cidrFunction, + DataTypes.BooleanType, + seq(ipAddress, cidrExpression), + seq(), + Option.empty, + Option.apply("cidr"), + false, + true) + + val expectedPlan = Project( + Seq(UnresolvedAttribute("ip")), + Filter(And(filterIpv6, cidr), UnresolvedRelation(Seq("t")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test cidrmatch for ipv6 for 2003:db8::/32 with ip field bool respond for each ip") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | where isV6 = true | eval inRange = case(cidrmatch(ipAddress, '2003:db8::/32'), 'in' else 'out') | fields ip, inRange"), + context) + + val ipAddress = UnresolvedAttribute("ipAddress") + val cidrExpression = Literal("2003:db8::/32") + + val filterIpv6 = EqualTo(UnresolvedAttribute("isV6"), Literal(true)) + val filterClause = Filter(filterIpv6, UnresolvedRelation(Seq("t"))) + val cidr = ScalaUDF( + SerializableUdf.cidrFunction, + DataTypes.BooleanType, + seq(ipAddress, cidrExpression), + seq(), + Option.empty, + Option.apply("cidr"), + false, + true) + + val equalTo = EqualTo(Literal(true), cidr) + val caseFunction = CaseWhen(Seq((equalTo, Literal("in"))), Literal("out")) + val aliasStatusCategory = Alias(caseFunction, "inRange")() + val evalProjectList = Seq(UnresolvedStar(None), aliasStatusCategory) + val evalProject = Project(evalProjectList, filterClause) + + val expectedPlan = + Project(Seq(UnresolvedAttribute("ip"), UnresolvedAttribute("inRange")), evalProject) + + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + +}