diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala new file mode 100644 index 000000000..3e6e9bd29 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLGrokITSuite.scala @@ -0,0 +1,329 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.StreamTest + +class FlintSparkPPLGrokITSuite + extends QueryTest + with LogicalPlanTestUtils + with FlintPPLSuite + with StreamTest { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.flint_ppl_test" + + override def beforeAll(): Unit = { + super.beforeAll() + + // Create test table + createPartitionedGrokEmailTable(testTable) + } + + protected override def afterEach(): Unit = { + super.afterEach() + // Stop all streaming jobs if any + spark.streams.active.foreach { job => + job.stop() + job.awaitTermination() + } + } + + test("test grok email expressions parsing") { + val frame = sql(s""" + | source = $testTable| grok email '.+@%{HOSTNAME:host}' | fields email, host + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("charlie@domain.net", "domain.net"), + Row("david@anotherdomain.com", "anotherdomain.com"), + Row("hank@demonstration.com", "demonstration.com"), + Row("alice@example.com", "example.com"), + Row("frank@sample.org", "sample.org"), + Row("grace@demo.net", "demo.net"), + Row("jack@sample.net", "sample.net"), + Row("eve@examples.com", "examples.com"), + Row("ivy@examples.com", "examples.com"), + Row("bob@test.org", "test.org")) + + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal("1")), + "host")() + val expectedPlan = Project( + Seq(emailAttribute, hostAttribute), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("test parse email expressions parsing filter & sort by age") { + val frame = sql(s""" + | source = $testTable| grok email '.+@%{HOSTNAME:host}' | where age > 45 | sort - age | fields age, email, host ; + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row(76, "frank@sample.org", "sample.org"), + Row(65, "charlie@domain.net", "domain.net"), + Row(55, "bob@test.org", "test.org")) + + // Compare the results + assert(results.sameElements(expectedResults)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + // Define the expected logical plan + val emailAttribute = UnresolvedAttribute("email") + val ageAttribute = UnresolvedAttribute("age") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + // Define the corrected expected plan + val expectedPlan = Project( + Seq(ageAttribute, emailAttribute, UnresolvedAttribute("host")), + Sort( + Seq(SortOrder(ageAttribute, Descending, NullsLast, Seq.empty)), + global = true, + Filter( + GreaterThan(ageAttribute, Literal(45)), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))) + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test parse email expressions and group by count host ") { + val frame = sql(s""" + | source = $testTable| grok email '.+@%{HOSTNAME:host}' | stats count() by host + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row(1L, "demonstration.com"), + Row(1L, "example.com"), + Row(1L, "domain.net"), + Row(1L, "anotherdomain.com"), + Row(1L, "sample.org"), + Row(1L, "demo.net"), + Row(1L, "sample.net"), + Row(2L, "examples.com"), + Row(1L, "test.org")) + + // Sort both the results and the expected results + implicit val rowOrdering: Ordering[Row] = Ordering.by(r => (r.getLong(0), r.getString(1))) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + // Define the corrected expected plan + val expectedPlan = Project( + Seq(UnresolvedStar(None)), // Matches the '*' in the Project + Aggregate( + Seq(Alias(hostAttribute, "host")()), // Group by 'host' + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "count()")(), + Alias(hostAttribute, "host")()), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))) + // Compare the logical plans + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + test("test parse email expressions and top count_host ") { + val frame = sql(s""" + | source = $testTable| grok email '.+@%{HOSTNAME:host}' | top 1 host + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array(Row(2L, "examples.com")) + + // Sort both the results and the expected results + implicit val rowOrdering: Ordering[Row] = Ordering.by(r => (r.getLong(0), r.getString(1))) + assert(results.sorted.sameElements(expectedResults.sorted)) + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + val sortedPlan = Sort( + Seq( + SortOrder( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(hostAttribute), isDistinct = false), + "count_host")(), + Descending, + NullsLast, + Seq.empty)), + global = true, + Aggregate( + Seq(hostAttribute), + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(hostAttribute), isDistinct = false), + "count_host")(), + hostAttribute), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))))) + // Define the corrected expected plan + val expectedPlan = Project( + Seq(UnresolvedStar(None)), // Matches the '*' in the Project + GlobalLimit(Literal(1), LocalLimit(Literal(1), sortedPlan))) + // Compare the logical plans + comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) + } + + ignore("test grok to grok raw logs.") { + val frame = sql(s""" + | source= $testTable | grok message '%{COMMONAPACHELOG}' | fields COMMONAPACHELOG, timestamp, response, bytes + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("charlie@domain.net", "domain.net"), + Row("david@anotherdomain.com", "anotherdomain.com"), + Row("hank@demonstration.com", "demonstration.com"), + Row("alice@example.com", "example.com"), + Row("frank@sample.org", "sample.org"), + Row("grace@demo.net", "demo.net"), + Row("jack@sample.net", "sample.net"), + Row("eve@examples.com", "examples.com"), + Row("ivy@examples.com", "examples.com"), + Row("bob@test.org", "test.org")) + + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val messageAttribute = UnresolvedAttribute("message") + val logAttribute = UnresolvedAttribute("COMMONAPACHELOG") + val timestampAttribute = UnresolvedAttribute("timestamp") + val responseAttribute = UnresolvedAttribute("response") + val bytesAttribute = UnresolvedAttribute("bytes") + // scalastyle:off + val expectedRegExp = + "(?(?(?:(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))|(?(?:(?((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:)))(%.+)?)|(?(?(?[a-zA-Z0-9._-]+)) (?(?[a-zA-Z0-9._-]+)) \\[(?(?(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]))/(?\\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\\b)/(?(?>\\d\\d){1,2}):(?(?!<[0-9])(?(?:2[0123]|[01]?[0-9])):(?(?:[0-5][0-9]))(?::(?(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)))(?![0-9])) (?(?:[+-]?(?:[0-9]+))))\\] \"(?:(?\\b\\w+\\b) (?\\S+)(?: HTTP/(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))))?|(?.*?))\" (?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?:(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+))))))|-))" + // scalastyle:on + + val COMMONAPACHELOG = Alias( + RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("1")), + "COMMONAPACHELOG")() + val timestamp = + Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("5")), "timestamp")() + val response = + Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("18")), "response")() + val bytes = + Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("19")), "bytes")() + val expectedPlan = Project( + Seq(logAttribute, timestampAttribute, responseAttribute, bytesAttribute), + Project( + Seq(messageAttribute, COMMONAPACHELOG, timestamp, response, bytes, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + + test("test grok address expressions with 2 fields identifies ") { + val frame = sql(s""" + | source= $testTable | grok street_address '%{NUMBER} %{GREEDYDATA:address}' | fields address + | """.stripMargin) + + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("Pine St, San Francisco"), + Row("Maple St, New York"), + Row("Spruce St, Miami"), + Row("Main St, Seattle"), + Row("Cedar St, Austin"), + Row("Birch St, Chicago"), + Row("Ash St, Seattle"), + Row("Oak St, Boston"), + Row("Fir St, Denver"), + Row("Elm St, Portland")) + // Compare the results + implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0)) + assert(results.sorted.sameElements(expectedResults.sorted)) + + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val street_addressAttribute = UnresolvedAttribute("street_address") + val addressAttribute = UnresolvedAttribute("address") + val addressExpression = Alias( + RegExpExtract( + street_addressAttribute, + Literal( + "(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?.*)"), + Literal("3")), + "address")() + val expectedPlan = Project( + Seq(addressAttribute), + Project( + Seq(street_addressAttribute, addressExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))) + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParseITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParseITSuite.scala index 388de3d31..e69999a8e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParseITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLParseITSuite.scala @@ -11,7 +11,7 @@ import org.opensearch.sql.ppl.utils.DataTypeTransformer.seq import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, LocalLimit, LogicalPlan, Project, Sort} import org.apache.spark.sql.streaming.StreamTest @@ -70,9 +70,8 @@ class FlintSparkPPLParseITSuite // Define the expected logical plan val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal("1")))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal("1")), "host")() val expectedPlan = Project( Seq(emailAttribute, hostAttribute), Project( @@ -102,9 +101,8 @@ class FlintSparkPPLParseITSuite // Define the expected logical plan val emailAttribute = UnresolvedAttribute("email") val ageAttribute = UnresolvedAttribute("age") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() // Define the corrected expected plan val expectedPlan = Project( @@ -147,9 +145,8 @@ class FlintSparkPPLParseITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() // Define the corrected expected plan val expectedPlan = Project( @@ -186,9 +183,8 @@ class FlintSparkPPLParseITSuite val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() val sortedPlan = Sort( Seq( @@ -217,4 +213,56 @@ class FlintSparkPPLParseITSuite // Compare the logical plans comparePlans(expectedPlan, logicalPlan, checkAnalysis = false) } + + test("test parse email & host expressions including cast and sort commands") { + val frame = sql(s""" + | source = $testTable| parse street_address '(?\\d+) (?.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street + | """.stripMargin) + // Retrieve the results + val results: Array[Row] = frame.collect() + // Define the expected results + val expectedResults: Array[Row] = Array( + Row("505", "Spruce St, Miami"), + Row("606", "Fir St, Denver"), + Row("707", "Ash St, Seattle"), + Row("789", "Pine St, San Francisco")) + + // Sort both the results and the expected results + implicit val rowOrdering: Ordering[Row] = Ordering.by(r => (r.getString(0), r.getString(1))) + assert(results.sorted.sameElements(expectedResults.sorted)) + // Retrieve the logical plan + val logicalPlan: LogicalPlan = frame.queryExecution.logical + + val addressAttribute = UnresolvedAttribute("street_address") + val streetNumberAttribute = UnresolvedAttribute("streetNumber") + val streetAttribute = UnresolvedAttribute("street") + + val streetNumberExpression = Alias( + RegExpExtract( + addressAttribute, + Literal("(?\\d+) (?.+)"), + Literal("1")), + "streetNumber")() + + val streetExpression = Alias( + RegExpExtract( + addressAttribute, + Literal("(?\\d+) (?.+)"), + Literal("2")), + "street")() + + val expectedPlan = Project( + Seq(streetNumberAttribute, streetAttribute), + Sort( + Seq(SortOrder(streetNumberAttribute, Ascending, NullsFirst, Seq.empty)), + global = true, + Filter( + GreaterThan(streetNumberAttribute, Literal(500)), + Project( + Seq(addressAttribute, streetNumberExpression, streetExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))))) + + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } + } diff --git a/ppl-spark-integration/README.md b/ppl-spark-integration/README.md index 24639e444..523a3cd79 100644 --- a/ppl-spark-integration/README.md +++ b/ppl-spark-integration/README.md @@ -314,18 +314,31 @@ Limitation: Overriding existing field is unsupported, following queries throw ex - `source=accounts | parse email '.+@(?.+)' | where age > 45 | sort - age | fields age, email, host` - `source=accounts | parse address '(?\d+) (?.+)' | where streetNumber > 500 | sort num(streetNumber) | fields streetNumber, street` +**Grok** +- `source=accounts | grok email '.+@%{HOSTNAME:host}' | top 1 host` +- `source=accounts | grok email '.+@%{HOSTNAME:host}' | stats count() by host` +- `source=accounts | grok email '.+@%{HOSTNAME:host}' | eval eval_result=1 | fields host, eval_result` +- `source=accounts | grok email '.+@%{HOSTNAME:host}' | eval eval_result=1 | fields host, eval_result` +- `source=accounts | grok street_address '%{NUMBER} %{GREEDYDATA:address}' | fields address ` +- `source=logs | grok message '%{COMMONAPACHELOG}' | fields COMMONAPACHELOG, timestamp, response, bytes` + +_- **Limitation: Overriding existing field is unsupported:**_ + - `source=accounts | grok address '%{NUMBER} %{GREEDYDATA:address}' | fields address` + > For additional details on PPL commands - view [PPL Commands Docs](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/index.rst) --- +#### Experimental Commands: +- `correlation` - [See details](../docs/PPL-Correlation-command.md) +> This is an experimental command - it may be removed in future versions -For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2) -For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408) +--- +### Documentations -#### Experimental Commands: - - `correlation` - [See details](../docs/PPL-Correlation-command.md) +For additional details on Spark PPL commands project, see [PPL Project](https://github.com/orgs/opensearch-project/projects/214/views/2) -> This is an experimental command - it may be removed in future versions +For additional details on Spark PPL commands support campaign, see [PPL Commands Campaign](https://github.com/opensearch-project/opensearch-spark/issues/408) \ No newline at end of file diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/common/grok/GrokUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/common/grok/GrokUtils.java index 4b145bbbe..f3951cd80 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/common/grok/GrokUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/common/grok/GrokUtils.java @@ -5,8 +5,10 @@ package org.opensearch.sql.common.grok; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; @@ -19,41 +21,60 @@ */ public class GrokUtils { - /** Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic. */ - public static final Pattern GROK_PATTERN = - Pattern.compile( - "%\\{" - + "(?" - + "(?[A-z0-9]+)" - + "(?::(?[A-z0-9_:;,\\-\\/\\s\\.']+))?" - + ")" - + "(?:=(?" - + "(?:" - + "(?:[^{}]+|\\.+)+" - + ")+" - + ")" - + ")?" - + "\\}"); - - public static final Pattern NAMED_REGEX = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"); - - /** getNameGroups. */ - public static Set getNameGroups(String regex) { - Set namedGroups = new LinkedHashSet<>(); - Matcher matcher = NAMED_REGEX.matcher(regex); - while (matcher.find()) { - namedGroups.add(matcher.group(1)); + /** + * Extract Grok patter like %{FOO} to FOO, Also Grok pattern with semantic. + */ + public static final Pattern GROK_PATTERN = + Pattern.compile( + "%\\{" + + "(?" + + "(?[A-z0-9]+)" + + "(?::(?[A-z0-9_:;,\\-\\/\\s\\.']+))?" + + ")" + + "(?:=(?" + + "(?:" + + "(?:[^{}]+|\\.+)+" + + ")+" + + ")" + + ")?" + + "\\}"); + + public static final Pattern NAMED_REGEX = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"); + + /** + * getNameGroups. + */ + public static Set getNameGroups(String regex) { + Set namedGroups = new LinkedHashSet<>(); + Matcher matcher = NAMED_REGEX.matcher(regex); + while (matcher.find()) { + namedGroups.add(matcher.group(1)); + } + return namedGroups; + } + + /** + * namedGroups. + */ + public static Map namedGroups(Matcher matcher, Set groupNames) { + Map namedGroups = new LinkedHashMap<>(); + for (String groupName : groupNames) { + String groupValue = matcher.group(groupName); + namedGroups.put(groupName, groupValue); + } + return namedGroups; + } + + public static String getGroupPatternName(Grok grok, String groupName) { + return getKeyByValue(grok.getNamedRegexCollection(),groupName); } - return namedGroups; - } - - /** namedGroups. */ - public static Map namedGroups(Matcher matcher, Set groupNames) { - Map namedGroups = new LinkedHashMap<>(); - for (String groupName : groupNames) { - String groupValue = matcher.group(groupName); - namedGroups.put(groupName, groupValue); + + public static K getKeyByValue(Map map, V value) { + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue().equals(value)) { + return entry.getKey(); + } + } + return null; } - return namedGroups; - } } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java index e262acbde..42f666236 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystPlanContext.java @@ -12,6 +12,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.catalyst.plans.logical.Union; import org.apache.spark.sql.types.Metadata; +import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.data.type.ExprType; import scala.collection.Iterator; import scala.collection.Seq; @@ -37,7 +38,7 @@ public class CatalystPlanContext { /** * Catalyst relations list **/ - private List projectedFields = new ArrayList<>(); + private List projectedFields = new ArrayList<>(); /** * Catalyst relations list **/ @@ -69,11 +70,12 @@ public List getRelations() { return relations; } - public List getProjectedFields() { + public List getProjectedFields() { return projectedFields; } public LogicalPlan getPlan() { + if (this.planBranches.isEmpty()) return null; if (this.planBranches.size() == 1) { return planBranches.peek(); } @@ -101,9 +103,10 @@ public Optional popNamedParseExpressions() { public Stack getGroupingParseExpressions() { return groupingParseExpressions; } - + /** * define new field + * * @param symbol * @return */ @@ -111,6 +114,7 @@ public LogicalPlan define(Expression symbol) { namedParseExpressions.push(symbol); return getPlan(); } + /** * append relation to relations list * @@ -121,13 +125,14 @@ public LogicalPlan withRelation(UnresolvedRelation relation) { this.relations.add(relation); return with(relation); } + /** * append projected fields * * @param projectedFields * @return */ - public LogicalPlan withProjectedFields(List projectedFields) { + public LogicalPlan withProjectedFields(List projectedFields) { this.projectedFields.addAll(projectedFields); return getPlan(); } diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java index 6caaec839..8759ddcf7 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/CatalystQueryPlanVisitor.java @@ -9,26 +9,21 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute$; import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; -import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.Ascending$; -import org.apache.spark.sql.catalyst.expressions.Coalesce; import org.apache.spark.sql.catalyst.expressions.Descending$; import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.catalyst.expressions.NamedExpression; import org.apache.spark.sql.catalyst.expressions.Predicate; -import org.apache.spark.sql.catalyst.expressions.RegExpExtract; import org.apache.spark.sql.catalyst.expressions.SortDirection; import org.apache.spark.sql.catalyst.expressions.SortOrder; -import org.apache.spark.sql.catalyst.expressions.StringRegexExpression; import org.apache.spark.sql.catalyst.plans.logical.Aggregate; -import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; import org.apache.spark.sql.catalyst.plans.logical.Deduplicate; +import org.apache.spark.sql.catalyst.plans.logical.DescribeRelation$; import org.apache.spark.sql.catalyst.plans.logical.Limit; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.catalyst.plans.logical.Union; +import org.apache.spark.sql.execution.command.DescribeTableCommand; import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.opensearch.sql.ast.AbstractNodeVisitor; import org.opensearch.sql.ast.expression.AggregateFunction; @@ -75,7 +70,7 @@ import org.opensearch.sql.ppl.utils.AggregatorTranslator; import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator; import org.opensearch.sql.ppl.utils.ComparatorTransformer; -import org.opensearch.sql.ppl.utils.ParseUtils; +import org.opensearch.sql.ppl.utils.ParseStrategy; import org.opensearch.sql.ppl.utils.SortUtils; import scala.Option; import scala.Option$; @@ -83,7 +78,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.function.BiFunction; @@ -91,8 +85,6 @@ import static java.util.Collections.emptyList; import static java.util.List.of; -import static org.apache.spark.sql.types.DataTypes.IntegerType; -import static org.apache.spark.sql.types.DataTypes.StringType; import static org.opensearch.sql.ppl.CatalystPlanContext.findRelation; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; import static org.opensearch.sql.ppl.utils.DataTypeTransformer.translate; @@ -239,8 +231,9 @@ public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) { @Override public LogicalPlan visitProject(Project node, CatalystPlanContext context) { + context.withProjectedFields(node.getProjectList()); LogicalPlan child = node.getChild().get(0).accept(this, context); - context.withProjectedFields(visitExpressionList(node.getProjectList(), context)); + visitExpressionList(node.getProjectList(), context); // Create a projection list from the existing expressions Seq projectList = seq(context.getNamedParseExpressions()); @@ -293,36 +286,7 @@ public LogicalPlan visitParse(Parse node, CatalystPlanContext context) { ParseMethod parseMethod = node.getParseMethod(); java.util.Map arguments = node.getArguments(); String pattern = (String) node.getPattern().getValue(); - return visitParseCommand(node, sourceField, parseMethod, arguments, pattern, context); - } - - private LogicalPlan visitParseCommand(Parse node, Expression sourceField, ParseMethod parseMethod, Map arguments, String pattern, CatalystPlanContext context) { - List namedGroupCandidates = ParseUtils.getNamedGroupCandidates(parseMethod, pattern, arguments); - String cleanedPattern = ParseUtils.extractPatterns(parseMethod, pattern, namedGroupCandidates); - for (int i = 0; i < namedGroupCandidates.size(); i++) { - String group = namedGroupCandidates.get(i); - //first create the regExp - RegExpExtract regExpExtract = new RegExpExtract(sourceField, - org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType), - org.apache.spark.sql.catalyst.expressions.Literal.create(i+1, IntegerType)); - //next create Coalesce to handle potential null values - Coalesce coalesce = new Coalesce(seq(regExpExtract)); - //next Alias the extracted fields - context.getNamedParseExpressions().push( - org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(coalesce, - group, - NamedExpression.newExprId(), - seq(new java.util.ArrayList()), - Option.empty(), - seq(new java.util.ArrayList()))); - } - // Create an UnresolvedStar for all-fields projection (possible external wrapping projection that may include additional fields) - context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); - // extract all fields to project with - Seq projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); - // build the plan with the projection step - LogicalPlan child = context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); - return child; + return ParseStrategy.visitParseCommand(node, sourceField, parseMethod, arguments, pattern, context); } @Override diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseStrategy.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseStrategy.java new file mode 100644 index 000000000..45766e588 --- /dev/null +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseStrategy.java @@ -0,0 +1,84 @@ +package org.opensearch.sql.ppl.utils; + +import org.apache.spark.sql.catalyst.analysis.UnresolvedStar$; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.catalyst.expressions.NamedExpression; +import org.apache.spark.sql.catalyst.expressions.RegExpExtract; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.opensearch.sql.ast.expression.AllFields; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.ParseMethod; +import org.opensearch.sql.ast.tree.Parse; +import org.opensearch.sql.ppl.CatalystPlanContext; +import scala.Option; +import scala.collection.Seq; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.StringType; +import static org.opensearch.sql.ppl.utils.DataTypeTransformer.seq; + +public interface ParseStrategy { + /** + * transform the parse/grok/patterns command into a standard catalyst RegExpExtract expression + * Since spark's RegExpExtract cant accept actual regExp group name we need to translate the group's name into its corresponding index + * + * @param node + * @param sourceField + * @param parseMethod + * @param arguments + * @param pattern + * @param context + * @return + */ + static LogicalPlan visitParseCommand(Parse node, Expression sourceField, ParseMethod parseMethod, Map arguments, String pattern, CatalystPlanContext context) { + Map namedGroupNumbers = new LinkedHashMap<>(); + List namedGroupCandidates = ParseUtils.getNamedGroupCandidates(parseMethod, pattern, arguments); + String cleanedPattern = ParseUtils.extractPatterns(parseMethod, pattern, namedGroupCandidates); + //get projected fields names (ignore the rest) + context.getProjectedFields().forEach(field -> { + // in select * case - take all namedGroupCandidates + if(field instanceof AllFields) { + for (int i = 0; i < namedGroupCandidates.size(); i++) { + namedGroupNumbers.put(namedGroupCandidates.get(i), + ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i))); + } + // in specific field case - match to the namedGroupCandidates group + } else for (int i = 0; i < namedGroupCandidates.size(); i++) { + if (((Field)field).getField().toString().equals(namedGroupCandidates.get(i))) { + namedGroupNumbers.put(namedGroupCandidates.get(i), + ParseUtils.getNamedGroupIndex(parseMethod, pattern, namedGroupCandidates.get(i))); + } + } + }); + //list the group numbers of these projected fields + // match the regExpExtract group identifier with its number + namedGroupNumbers.forEach((group, index) -> { + //first create the regExp + RegExpExtract regExpExtract = new RegExpExtract(sourceField, + org.apache.spark.sql.catalyst.expressions.Literal.create(cleanedPattern, StringType), + org.apache.spark.sql.catalyst.expressions.Literal.create(index + 1, IntegerType)); + //next Alias the extracted fields + context.getNamedParseExpressions().push( + org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(regExpExtract, + group, + NamedExpression.newExprId(), + seq(new java.util.ArrayList()), + Option.empty(), + seq(new java.util.ArrayList()))); + }); + + // Create an UnresolvedStar for all-fields projection (possible external wrapping projection that may include additional fields) + context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.>empty())); + // extract all fields to project with + Seq projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p); + // build the plan with the projection step + LogicalPlan child = context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p)); + return child; + } + +} diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseUtils.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseUtils.java index 54b43db0e..128463df1 100644 --- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseUtils.java +++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/utils/ParseUtils.java @@ -12,87 +12,45 @@ import org.opensearch.sql.ast.expression.ParseMethod; import org.opensearch.sql.common.grok.Grok; import org.opensearch.sql.common.grok.GrokCompiler; +import org.opensearch.sql.common.grok.GrokUtils; import org.opensearch.sql.common.grok.Match; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -public class ParseUtils { - private static final String NEW_FIELD_KEY = "new_field"; - - /** - * Construct corresponding ParseExpression by {@link ParseMethod}. - * - * @param parseMethod method used to parse - * @param pattern pattern used for parsing - * @param identifier derived field - * @return {@link ParseExpression} - */ - public static ParseExpression createParseExpression( - ParseMethod parseMethod, String pattern, String identifier) { - switch (parseMethod) { - case GROK: return new GrokExpression(pattern, identifier); - case PATTERNS: return new PatternsExpression(pattern, identifier); - default: return new RegexExpression(pattern, identifier); - } - } - - /** - * Get list of derived fields based on parse pattern. - * - * @param pattern pattern used for parsing - * @return list of names of the derived fields - */ - public static List getNamedGroupCandidates( - ParseMethod parseMethod, String pattern, Map arguments) { - switch (parseMethod) { - case REGEX: - return RegexExpression.getNamedGroupCandidates(pattern); - case GROK: - return GrokExpression.getNamedGroupCandidates(pattern); - default: - return PatternsExpression.getNamedGroupCandidates( - arguments.containsKey(NEW_FIELD_KEY) - ? (String) arguments.get(NEW_FIELD_KEY).getValue() - : null); - } - } - - /** - * extract the cleaner pattern without the additional fields - * @param parseMethod - * @param pattern - * @param columns - * @return - */ - public static String extractPatterns( - ParseMethod parseMethod, String pattern, List columns) { - switch (parseMethod) { - case REGEX: - return RegexExpression.extractPattern(pattern, columns); - case GROK: - return GrokExpression.extractPattern(pattern, columns); - default: - return PatternsExpression.extractPattern(pattern, columns); - } - } +import static org.opensearch.sql.common.grok.GrokUtils.getGroupPatternName; - public static abstract class ParseExpression { - abstract String parseValue(String value); - } - - public static class RegexExpression extends ParseExpression{ +public class ParseUtils { private static final Pattern GROUP_PATTERN = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>"); - private final Pattern regexPattern; - protected final String identifier; + private static final String NEW_FIELD_KEY = "new_field"; - public RegexExpression(String patterns, String identifier) { - this.regexPattern = Pattern.compile(patterns); - this.identifier = identifier; + /** + * Construct corresponding ParseExpression by {@link ParseMethod}. + * + * @param parseMethod method used to parse + * @param pattern pattern used for parsing + * @param identifier derived field + * @return {@link ParseExpression} + */ + public static ParseExpression createParseExpression( + ParseMethod parseMethod, String pattern, String identifier) { + switch (parseMethod) { + case GROK: + return new GrokExpression(pattern, identifier); + case PATTERNS: + return new PatternsExpression(pattern, identifier); + default: + return new RegexExpression(pattern, identifier); + } } /** @@ -101,138 +59,221 @@ public RegexExpression(String patterns, String identifier) { * @param pattern pattern used for parsing * @return list of names of the derived fields */ - public static List getNamedGroupCandidates(String pattern) { - ImmutableList.Builder namedGroups = ImmutableList.builder(); - Matcher m = GROUP_PATTERN.matcher(pattern); - while (m.find()) { - namedGroups.add(m.group(1)); - } - return namedGroups.build(); - } - - @Override - public String parseValue(String value) { - Matcher matcher = regexPattern.matcher(value); - if (matcher.matches()) { - return matcher.group(identifier); - } - return ""; - } - - public static String extractPattern(String patterns, List columns) { - StringBuilder result = new StringBuilder(); - Matcher matcher = GROUP_PATTERN.matcher(patterns); - - int lastEnd = 0; - while (matcher.find()) { - String groupName = matcher.group(1); - if (columns.contains(groupName)) { - result.append(patterns, lastEnd, matcher.start()); - result.append("("); - lastEnd = matcher.end(); - } - } - result.append(patterns.substring(lastEnd)); - return result.toString(); - } - } - - public static class GrokExpression extends ParseExpression{ - private static final GrokCompiler grokCompiler = GrokCompiler.newInstance(); - private final Grok grok; - private final String identifier; - - public GrokExpression(String pattern, String identifier) { - this.grok = grokCompiler.compile(pattern); - this.identifier = identifier; - } - - @Override - public String parseValue(String value) { - Match grokMatch = grok.match(value); - Map capture = grokMatch.capture(); - Object match = capture.get(identifier); - if (match != null) { - return match.toString(); - } - return ""; + public static List getNamedGroupCandidates( + ParseMethod parseMethod, String pattern, Map arguments) { + switch (parseMethod) { + case REGEX: + return RegexExpression.getNamedGroupCandidates(pattern); + case GROK: + return GrokExpression.getNamedGroupCandidates(pattern); + default: + return GrokExpression.getNamedGroupCandidates( + arguments.containsKey(NEW_FIELD_KEY) + ? (String) arguments.get(NEW_FIELD_KEY).getValue() + : null); + } } - /** * Get list of derived fields based on parse pattern. * * @param pattern pattern used for parsing * @return list of names of the derived fields */ - public static List getNamedGroupCandidates(String pattern) { - Grok grok = grokCompiler.compile(pattern); - return grok.namedGroups.stream() - .map(grok::getNamedRegexCollectionById) - .filter(group -> !group.equals("UNWANTED")) - .collect(Collectors.toUnmodifiableList()); - } - - public static String extractPattern(String patterns, List columns) { - //todo implement - return patterns; + public static int getNamedGroupIndex( + ParseMethod parseMethod, String pattern, String namedGroup) { + switch (parseMethod) { + case REGEX: + return RegexExpression.getNamedGroupIndex(pattern, namedGroup); + case GROK: + return GrokExpression.getNamedGroupIndex(pattern, namedGroup); + default: + return PatternsExpression.getNamedGroupIndex(pattern, namedGroup); + } } - } - - public static class PatternsExpression extends ParseExpression{ - public static final String DEFAULT_NEW_FIELD = "patterns_field"; - - private static final ImmutableSet DEFAULT_IGNORED_CHARS = - ImmutableSet.copyOf( - "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - .chars() - .mapToObj(c -> (char) c) - .toArray(Character[]::new)); - private final boolean useCustomPattern; - private Pattern pattern; /** - * PatternsExpression. + * extract the cleaner pattern without the additional fields * - * @param pattern pattern used for parsing - * @param identifier derived field + * @param parseMethod + * @param pattern + * @param columns + * @return */ - public PatternsExpression(String pattern, String identifier) { - useCustomPattern = !pattern.isEmpty(); - if (useCustomPattern) { - this.pattern = Pattern.compile(pattern); - } + public static String extractPatterns( + ParseMethod parseMethod, String pattern, List columns) { + switch (parseMethod) { + case REGEX: + return RegexExpression.extractPattern(pattern, columns); + case GROK: + return GrokExpression.extractPattern(pattern, columns); + default: + return PatternsExpression.extractPattern(pattern, columns); + } + } + + public static abstract class ParseExpression { + abstract String parseValue(String value); } - @Override - public String parseValue(String value) { - if (useCustomPattern) { - return pattern.matcher(value).replaceAll(""); - } + public static class RegexExpression extends ParseExpression { + private final Pattern regexPattern; + protected final String identifier; + + public RegexExpression(String patterns, String identifier) { + this.regexPattern = Pattern.compile(patterns); + this.identifier = identifier; + } + + /** + * Get list of derived fields based on parse pattern. + * + * @param pattern pattern used for parsing + * @return list of names of the derived fields + */ + public static List getNamedGroupCandidates(String pattern) { + ImmutableList.Builder namedGroups = ImmutableList.builder(); + Matcher m = GROUP_PATTERN.matcher(pattern); + while (m.find()) { + namedGroups.add(m.group(1)); + } + return namedGroups.build(); + } + + public static int getNamedGroupIndex(String pattern,String groupName) { + List groupCandidates = getNamedGroupCandidates(pattern); + for (int i = 0; i < groupCandidates.size(); i++) { + if(groupCandidates.get(i).equals(groupName)) return i; + } + return -1; + } - char[] chars = value.toCharArray(); - int pos = 0; - for (int i = 0; i < chars.length; i++) { - if (!DEFAULT_IGNORED_CHARS.contains(chars[i])) { - chars[pos++] = chars[i]; + @Override + public String parseValue(String value) { + Matcher matcher = regexPattern.matcher(value); + if (matcher.matches()) { + return matcher.group(identifier); + } + return ""; + } + + public static String extractPattern(String patterns, List columns) { + return patterns; } - } - return new String(chars, 0, pos); } - /** - * Get list of derived fields. - * - * @param identifier identifier used to generate the field name - * @return list of names of the derived fields - */ - public static List getNamedGroupCandidates(String identifier) { - return ImmutableList.of(Objects.requireNonNullElse(identifier, DEFAULT_NEW_FIELD)); + public static class GrokExpression extends ParseExpression { + private static final GrokCompiler grokCompiler = GrokCompiler.newInstance(); + + static { + grokCompiler.registerDefaultPatterns(); + } + + private final Grok grok; + private final String identifier; + + public GrokExpression(String pattern, String identifier) { + this.grok = grokCompiler.compile(pattern); + this.identifier = identifier; + } + + @Override + public String parseValue(String value) { + Match grokMatch = grok.match(value); + Map capture = grokMatch.capture(); + Object match = capture.get(identifier); + if (match != null) { + return match.toString(); + } + return ""; + } + + /** + * Get list of derived fields based on parse pattern. + * + * @param pattern pattern used for parsing + * @return list of names of the derived fields + */ + public static List getNamedGroupCandidates(String pattern) { + Grok grok = grokCompiler.compile(pattern); + return grok.namedGroups.stream() + .map(grok::getNamedRegexCollectionById) + .filter(group -> !group.equals("UNWANTED")) + .collect(Collectors.toUnmodifiableList()); + } + + public static int getNamedGroupIndex(String pattern,String groupName) { + String name = getGroupPatternName(grokCompiler.compile(pattern), groupName); + List namedGroups = new ArrayList<>(grokCompiler.compile(pattern).namedGroups); + for (int i = 0; i < namedGroups.size(); i++) { + if(namedGroups.get(i).equals(name)) return i; + } + return -1; + } + + public static String extractPattern(final String patterns, List columns) { + Grok grok = grokCompiler.compile(patterns); + return grok.getNamedRegex(); + } } - - public static String extractPattern(String patterns, List columns) { - //todo implement - return patterns; + + public static class PatternsExpression extends ParseExpression { + public static final String DEFAULT_NEW_FIELD = "patterns_field"; + + private static final ImmutableSet DEFAULT_IGNORED_CHARS = + ImmutableSet.copyOf( + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + .chars() + .mapToObj(c -> (char) c) + .toArray(Character[]::new)); + private final boolean useCustomPattern; + private Pattern pattern; + + /** + * PatternsExpression. + * + * @param pattern pattern used for parsing + * @param identifier derived field + */ + public PatternsExpression(String pattern, String identifier) { + useCustomPattern = !pattern.isEmpty(); + if (useCustomPattern) { + this.pattern = Pattern.compile(pattern); + } + } + + public static int getNamedGroupIndex(String pattern, String namedGroup) { + return 0; + } + + @Override + public String parseValue(String value) { + if (useCustomPattern) { + return pattern.matcher(value).replaceAll(""); + } + + char[] chars = value.toCharArray(); + int pos = 0; + for (int i = 0; i < chars.length; i++) { + if (!DEFAULT_IGNORED_CHARS.contains(chars[i])) { + chars[pos++] = chars[i]; + } + } + return new String(chars, 0, pos); + } + + /** + * Get list of derived fields. + * + * @param identifier identifier used to generate the field name + * @return list of names of the derived fields + */ + public static List getNamedGroupCandidates(String identifier) { + return ImmutableList.of(Objects.requireNonNullElse(identifier, DEFAULT_NEW_FIELD)); + } + + public static String extractPattern(String patterns, List columns) { + return patterns; + } } - } - + } diff --git a/ppl-spark-integration/src/main/resources/patterns/firewalls b/ppl-spark-integration/src/main/resources/patterns/firewalls new file mode 100644 index 000000000..8f16bea32 --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/firewalls @@ -0,0 +1,3 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +# NetScreen firewall logs +NETSCREENSESSIONLOG %{SYSLOGTIMESTAMP:date} %{IPORHOST:device} %{IPORHOST}: NetScreen device_id=%{WORD:device_id}%{DATA}: start_time=%{QUOTEDSTRING:start_time} duration=%{INT:duration} policy_id=%{INT:policy_id} service=%{DATA:service} proto=%{INT:proto} src zone=%{WORD:src_zone} dst zone=%{WORD:dst_zone} action=%{WORD:action} sent=%{INT:sent} rcvd=%{INT:rcvd} src=%{IPORHOST:src_ip} dst=%{IPORHOST:dst_ip} src_port=%{INT:src_port} dst_port=%{INT:dst_port} src-xlated ip=%{IPORHOST:src_xlated_ip} port=%{INT:src_xlated_port} dst-xlated ip=%{IPORHOST:dst_xlated_ip} port=%{INT:dst_xlated_port} session_id=%{INT:session_id} reason=%{GREEDYDATA:reason} diff --git a/ppl-spark-integration/src/main/resources/patterns/haproxy b/ppl-spark-integration/src/main/resources/patterns/haproxy new file mode 100644 index 000000000..99fff302f --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/haproxy @@ -0,0 +1,38 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +## These patterns were tested w/ haproxy-1.4.15 + +## Documentation of the haproxy log formats can be found at the following links: +## http://code.google.com/p/haproxy-docs/wiki/HTTPLogFormat +## http://code.google.com/p/haproxy-docs/wiki/TCPLogFormat + +HAPROXYTIME (?!<[0-9])%{HOUR:haproxy_hour}:%{MINUTE:haproxy_minute}(?::%{SECOND:haproxy_second})(?![0-9]) +HAPROXYDATE %{MONTHDAY:haproxy_monthday}/%{MONTH:haproxy_month}/%{YEAR:haproxy_year}:%{HAPROXYTIME:haproxy_time}.%{INT:haproxy_milliseconds} + +# Override these default patterns to parse out what is captured in your haproxy.cfg +HAPROXYCAPTUREDREQUESTHEADERS %{DATA:captured_request_headers} +HAPROXYCAPTUREDRESPONSEHEADERS %{DATA:captured_response_headers} + +# Example: +# These haproxy config lines will add data to the logs that are captured +# by the patterns below. Place them in your custom patterns directory to +# override the defaults. +# +# capture request header Host len 40 +# capture request header X-Forwarded-For len 50 +# capture request header Accept-Language len 50 +# capture request header Referer len 200 +# capture request header User-Agent len 200 +# +# capture response header Content-Type len 30 +# capture response header Content-Encoding len 10 +# capture response header Cache-Control len 200 +# capture response header Last-Modified len 200 +# +# HAPROXYCAPTUREDREQUESTHEADERS %{DATA:request_header_host}\|%{DATA:request_header_x_forwarded_for}\|%{DATA:request_header_accept_language}\|%{DATA:request_header_referer}\|%{DATA:request_header_user_agent} +# HAPROXYCAPTUREDRESPONSEHEADERS %{DATA:response_header_content_type}\|%{DATA:response_header_content_encoding}\|%{DATA:response_header_cache_control}\|%{DATA:response_header_last_modified} + +# parse a haproxy 'httplog' line +HAPROXYHTTP %{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_request}/%{INT:time_queue}/%{INT:time_backend_connect}/%{INT:time_backend_response}/%{NOTSPACE:time_duration} %{INT:http_status_code} %{NOTSPACE:bytes_read} %{DATA:captured_request_cookie} %{DATA:captured_response_cookie} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue} (\{%{HAPROXYCAPTUREDREQUESTHEADERS}\})?( )?(\{%{HAPROXYCAPTUREDRESPONSEHEADERS}\})?( )?"%{WORD:http_verb} %{URIPATHPARAM:http_request}( HTTP/%{NUMBER:http_version}")? + +# parse a haproxy 'tcplog' line +HAPROXYTCP %{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_queue}/%{INT:time_backend_connect}/%{NOTSPACE:time_duration} %{NOTSPACE:bytes_read} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue} diff --git a/ppl-spark-integration/src/main/resources/patterns/java b/ppl-spark-integration/src/main/resources/patterns/java new file mode 100644 index 000000000..611952cd3 --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/java @@ -0,0 +1,4 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +JAVACLASS (?:[a-zA-Z0-9-]+\.)+[A-Za-z0-9$]+ +JAVAFILE (?:[A-Za-z0-9_.-]+) +JAVASTACKTRACEPART at %{JAVACLASS:class}\.%{WORD:method}\(%{JAVAFILE:file}:%{NUMBER:line}\) diff --git a/ppl-spark-integration/src/main/resources/patterns/linux-syslog b/ppl-spark-integration/src/main/resources/patterns/linux-syslog new file mode 100644 index 000000000..a888999ec --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/linux-syslog @@ -0,0 +1,8 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: +SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})? + +CRON_ACTION [A-Z ]+ +CRONLOG %{SYSLOGBASE} \(%{USER:user}\) %{CRON_ACTION:action} \(%{DATA:message}\) + +SYSLOGLINE %{SYSLOGBASE2} %{GREEDYDATA:message} diff --git a/ppl-spark-integration/src/main/resources/patterns/nagios b/ppl-spark-integration/src/main/resources/patterns/nagios new file mode 100644 index 000000000..771cb0a78 --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/nagios @@ -0,0 +1,109 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +################################################################################## +################################################################################## +# Chop Nagios log files to smithereens! +# +# A set of GROK filters to process logfiles generated by Nagios. +# While it does not, this set intends to cover all possible Nagios logs. +# +# Some more work needs to be done to cover all External Commands: +# http://old.nagios.org/developerinfo/externalcommands/commandlist.php +# +# If you need some support on these rules please contact: +# Jelle Smet http://smetj.net +# +################################################################################# +################################################################################# + +NAGIOSTIME \[%{NUMBER:nagios_epoch}\] + +############################################### +######## Begin nagios log types +############################################### +NAGIOS_TYPE_CURRENT_SERVICE_STATE CURRENT SERVICE STATE +NAGIOS_TYPE_CURRENT_HOST_STATE CURRENT HOST STATE + +NAGIOS_TYPE_SERVICE_NOTIFICATION SERVICE NOTIFICATION +NAGIOS_TYPE_HOST_NOTIFICATION HOST NOTIFICATION + +NAGIOS_TYPE_SERVICE_ALERT SERVICE ALERT +NAGIOS_TYPE_HOST_ALERT HOST ALERT + +NAGIOS_TYPE_SERVICE_FLAPPING_ALERT SERVICE FLAPPING ALERT +NAGIOS_TYPE_HOST_FLAPPING_ALERT HOST FLAPPING ALERT + +NAGIOS_TYPE_SERVICE_DOWNTIME_ALERT SERVICE DOWNTIME ALERT +NAGIOS_TYPE_HOST_DOWNTIME_ALERT HOST DOWNTIME ALERT + +NAGIOS_TYPE_PASSIVE_SERVICE_CHECK PASSIVE SERVICE CHECK +NAGIOS_TYPE_PASSIVE_HOST_CHECK PASSIVE HOST CHECK + +NAGIOS_TYPE_SERVICE_EVENT_HANDLER SERVICE EVENT HANDLER +NAGIOS_TYPE_HOST_EVENT_HANDLER HOST EVENT HANDLER + +NAGIOS_TYPE_EXTERNAL_COMMAND EXTERNAL COMMAND +NAGIOS_TYPE_TIMEPERIOD_TRANSITION TIMEPERIOD TRANSITION +############################################### +######## End nagios log types +############################################### + +############################################### +######## Begin external check types +############################################### +NAGIOS_EC_DISABLE_SVC_CHECK DISABLE_SVC_CHECK +NAGIOS_EC_ENABLE_SVC_CHECK ENABLE_SVC_CHECK +NAGIOS_EC_DISABLE_HOST_CHECK DISABLE_HOST_CHECK +NAGIOS_EC_ENABLE_HOST_CHECK ENABLE_HOST_CHECK +NAGIOS_EC_PROCESS_SERVICE_CHECK_RESULT PROCESS_SERVICE_CHECK_RESULT +NAGIOS_EC_PROCESS_HOST_CHECK_RESULT PROCESS_HOST_CHECK_RESULT +NAGIOS_EC_SCHEDULE_SERVICE_DOWNTIME SCHEDULE_SERVICE_DOWNTIME +NAGIOS_EC_SCHEDULE_HOST_DOWNTIME SCHEDULE_HOST_DOWNTIME +############################################### +######## End external check types +############################################### +NAGIOS_WARNING Warning:%{SPACE}%{GREEDYDATA:nagios_message} + +NAGIOS_CURRENT_SERVICE_STATE %{NAGIOS_TYPE_CURRENT_SERVICE_STATE:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{DATA:nagios_statetype};%{DATA:nagios_statecode};%{GREEDYDATA:nagios_message} +NAGIOS_CURRENT_HOST_STATE %{NAGIOS_TYPE_CURRENT_HOST_STATE:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{DATA:nagios_statetype};%{DATA:nagios_statecode};%{GREEDYDATA:nagios_message} + +NAGIOS_SERVICE_NOTIFICATION %{NAGIOS_TYPE_SERVICE_NOTIFICATION:nagios_type}: %{DATA:nagios_notifyname};%{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{DATA:nagios_contact};%{GREEDYDATA:nagios_message} +NAGIOS_HOST_NOTIFICATION %{NAGIOS_TYPE_HOST_NOTIFICATION}: %{DATA:nagios_notifyname};%{DATA:nagios_hostname};%{DATA:nagios_state};%{DATA:nagios_contact};%{GREEDYDATA:nagios_message} + +NAGIOS_SERVICE_ALERT %{NAGIOS_TYPE_SERVICE_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{DATA:nagios_statelevel};%{NUMBER:nagios_attempt};%{GREEDYDATA:nagios_message} +NAGIOS_HOST_ALERT %{NAGIOS_TYPE_HOST_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{DATA:nagios_statelevel};%{NUMBER:nagios_attempt};%{GREEDYDATA:nagios_message} + +NAGIOS_SERVICE_FLAPPING_ALERT %{NAGIOS_TYPE_SERVICE_FLAPPING_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{GREEDYDATA:nagios_message} +NAGIOS_HOST_FLAPPING_ALERT %{NAGIOS_TYPE_HOST_FLAPPING_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{GREEDYDATA:nagios_message} + +NAGIOS_SERVICE_DOWNTIME_ALERT %{NAGIOS_TYPE_SERVICE_DOWNTIME_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{GREEDYDATA:nagios_comment} +NAGIOS_HOST_DOWNTIME_ALERT %{NAGIOS_TYPE_HOST_DOWNTIME_ALERT:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{GREEDYDATA:nagios_comment} + +NAGIOS_PASSIVE_SERVICE_CHECK %{NAGIOS_TYPE_PASSIVE_SERVICE_CHECK:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{GREEDYDATA:nagios_comment} +NAGIOS_PASSIVE_HOST_CHECK %{NAGIOS_TYPE_PASSIVE_HOST_CHECK:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{GREEDYDATA:nagios_comment} + +NAGIOS_SERVICE_EVENT_HANDLER %{NAGIOS_TYPE_SERVICE_EVENT_HANDLER:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{DATA:nagios_statelevel};%{DATA:nagios_event_handler_name} +NAGIOS_HOST_EVENT_HANDLER %{NAGIOS_TYPE_HOST_EVENT_HANDLER:nagios_type}: %{DATA:nagios_hostname};%{DATA:nagios_state};%{DATA:nagios_statelevel};%{DATA:nagios_event_handler_name} + +NAGIOS_TIMEPERIOD_TRANSITION %{NAGIOS_TYPE_TIMEPERIOD_TRANSITION:nagios_type}: %{DATA:nagios_service};%{DATA:nagios_unknown1};%{DATA:nagios_unknown2}; + +#################### +#### External checks +#################### + +#Disable host & service check +NAGIOS_EC_LINE_DISABLE_SVC_CHECK %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_DISABLE_SVC_CHECK:nagios_command};%{DATA:nagios_hostname};%{DATA:nagios_service} +NAGIOS_EC_LINE_DISABLE_HOST_CHECK %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_DISABLE_HOST_CHECK:nagios_command};%{DATA:nagios_hostname} + +#Enable host & service check +NAGIOS_EC_LINE_ENABLE_SVC_CHECK %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_ENABLE_SVC_CHECK:nagios_command};%{DATA:nagios_hostname};%{DATA:nagios_service} +NAGIOS_EC_LINE_ENABLE_HOST_CHECK %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_ENABLE_HOST_CHECK:nagios_command};%{DATA:nagios_hostname} + +#Process host & service check +NAGIOS_EC_LINE_PROCESS_SERVICE_CHECK_RESULT %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_PROCESS_SERVICE_CHECK_RESULT:nagios_command};%{DATA:nagios_hostname};%{DATA:nagios_service};%{DATA:nagios_state};%{GREEDYDATA:nagios_check_result} +NAGIOS_EC_LINE_PROCESS_HOST_CHECK_RESULT %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_PROCESS_HOST_CHECK_RESULT:nagios_command};%{DATA:nagios_hostname};%{DATA:nagios_state};%{GREEDYDATA:nagios_check_result} + +#Schedule host & service downtime +NAGIOS_EC_LINE_SCHEDULE_HOST_DOWNTIME %{NAGIOS_TYPE_EXTERNAL_COMMAND:nagios_type}: %{NAGIOS_EC_SCHEDULE_HOST_DOWNTIME:nagios_command};%{DATA:nagios_hostname};%{NUMBER:nagios_start_time};%{NUMBER:nagios_end_time};%{NUMBER:nagios_fixed};%{NUMBER:nagios_trigger_id};%{NUMBER:nagios_duration};%{DATA:author};%{DATA:comment} + +#End matching line +NAGIOSLOGLINE %{NAGIOSTIME} (?:%{NAGIOS_WARNING}|%{NAGIOS_CURRENT_SERVICE_STATE}|%{NAGIOS_CURRENT_HOST_STATE}|%{NAGIOS_SERVICE_NOTIFICATION}|%{NAGIOS_HOST_NOTIFICATION}|%{NAGIOS_SERVICE_ALERT}|%{NAGIOS_HOST_ALERT}|%{NAGIOS_SERVICE_FLAPPING_ALERT}|%{NAGIOS_HOST_FLAPPING_ALERT}|%{NAGIOS_SERVICE_DOWNTIME_ALERT}|%{NAGIOS_HOST_DOWNTIME_ALERT}|%{NAGIOS_PASSIVE_SERVICE_CHECK}|%{NAGIOS_PASSIVE_HOST_CHECK}|%{NAGIOS_SERVICE_EVENT_HANDLER}|%{NAGIOS_HOST_EVENT_HANDLER}|%{NAGIOS_TIMEPERIOD_TRANSITION}|%{NAGIOS_EC_LINE_DISABLE_SVC_CHECK}|%{NAGIOS_EC_LINE_ENABLE_SVC_CHECK}|%{NAGIOS_EC_LINE_DISABLE_HOST_CHECK|%{NAGIOS_EC_LINE_ENABLE_HOST_CHECK}|%{NAGIOS_EC_LINE_PROCESS_HOST_CHECK_RESULT}|%{NAGIOS_EC_LINE_PROCESS_SERVICE_CHECK_RESULT}|%{NAGIOS_EC_LINE_SCHEDULE_HOST_DOWNTIME}) diff --git a/ppl-spark-integration/src/main/resources/patterns/patterns b/ppl-spark-integration/src/main/resources/patterns/patterns new file mode 100644 index 000000000..52235d8c7 --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/patterns @@ -0,0 +1,108 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns + +USERNAME [a-zA-Z0-9._-]+ +USER %{USERNAME:UNWANTED} +INT (?:[+-]?(?:[0-9]+)) +BASE10NUM (?[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) +NUMBER (?:%{BASE10NUM:UNWANTED}) +BASE16NUM (?(?"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) +UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} + +# Networking +MAC (?:%{CISCOMAC:UNWANTED}|%{WINDOWSMAC:UNWANTED}|%{COMMONMAC:UNWANTED}) +CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) +WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) +COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) +IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? +IPV4 (?/(?>[\w_%!$@:.,~-]+|\\.)*)+ +#UNIXPATH (?[A-Za-z]+:|\\)(?:\\[^\\?*]*)+ +URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? +URIHOST %{IPORHOST}(?::%{POSINT:port})? +# uripath comes loosely from RFC1738, but mostly from what Firefox +# doesn't turn into %XX +URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ +#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? +URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]* +URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? +URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? + +# Months: January, Feb, 3, 03, 12, December +MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b +MONTHNUM (?:0?[1-9]|1[0-2]) +MONTHNUM2 (?:0[1-9]|1[0-2]) +MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) + +# Days: Monday, Tue, Thu, etc... +DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) + +# Years? +YEAR (?>\d\d){1,2} +# Time: HH:MM:SS +#TIME \d{2}:\d{2}(?::\d{2}(?:\.\d+)?)? +# I'm still on the fence about using grok to perform the time match, +# since it's probably slower. +# TIME %{POSINT<24}:%{POSINT<60}(?::%{POSINT<60}(?:\.%{POSINT})?)? +HOUR (?:2[0123]|[01]?[0-9]) +MINUTE (?:[0-5][0-9]) +# '60' is a leap second in most time standards and thus is valid. +SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) +TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) +# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) +DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} +DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} +ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) +ISO8601_SECOND (?:%{SECOND}|60) +TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? +DATE %{DATE_US}|%{DATE_EU} +DATESTAMP %{DATE}[- ]%{TIME} +TZ (?:[PMCE][SD]T|UTC) +DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} +DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} +DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} +DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} + +# Syslog Dates: Month Day HH:MM:SS +SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} +PROG (?:[\w._/%-]+) +SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])? +SYSLOGHOST %{IPORHOST} +SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> +HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} + +# Shortcuts +QS %{QUOTEDSTRING:UNWANTED} + +# Log formats +SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: + +MESSAGESLOG %{SYSLOGBASE} %{DATA} + +COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) +COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} +COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-) + + +# Log Levels +LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) diff --git a/ppl-spark-integration/src/main/resources/patterns/postfix b/ppl-spark-integration/src/main/resources/patterns/postfix new file mode 100644 index 000000000..3adcb699d --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/postfix @@ -0,0 +1,125 @@ +# common postfix patterns +POSTFIX_QUEUEID ([0-9A-F]{6,}|[0-9a-zA-Z]{15,}) +POSTFIX_CLIENT_INFO %{HOSTNAME:postfix_client_hostname}?\[%{IP:postfix_client_ip}\](:%{INT:postfix_client_port})? +POSTFIX_RELAY_INFO %{HOSTNAME:postfix_relay_hostname}?\[(%{IP:postfix_relay_ip}|%{DATA:postfix_relay_service})\](:%{INT:postfix_relay_port})?|%{WORD:postfix_relay_service} +POSTFIX_SMTP_STAGE (CONNECT|HELO|EHLO|STARTTLS|AUTH|MAIL( FROM)?|RCPT( TO)?|(end of )?DATA|RSET|UNKNOWN|END-OF-MESSAGE|VRFY|\.) +POSTFIX_ACTION (accept|defer|discard|filter|header-redirect|reject) +POSTFIX_STATUS_CODE \d{3} +POSTFIX_STATUS_CODE_ENHANCED \d\.\d\.\d +POSTFIX_DNSBL_MESSAGE Service unavailable; .* \[%{GREEDYDATA:postfix_status_data}\] %{GREEDYDATA:postfix_status_message}; +POSTFIX_PS_ACCESS_ACTION (DISCONNECT|BLACKLISTED|WHITELISTED|WHITELIST VETO|PASS NEW|PASS OLD) +POSTFIX_PS_VIOLATION (BARE NEWLINE|COMMAND (TIME|COUNT|LENGTH) LIMIT|COMMAND PIPELINING|DNSBL|HANGUP|NON-SMTP COMMAND|PREGREET) +POSTFIX_TIME_UNIT %{NUMBER}[smhd] +POSTFIX_KEYVALUE_DATA [\w-]+=[^;]* +POSTFIX_KEYVALUE %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} +POSTFIX_WARNING_LEVEL (warning|fatal|info) + +POSTFIX_TLSCONN (Anonymous|Trusted|Untrusted|Verified) TLS connection established (to %{POSTFIX_RELAY_INFO}|from %{POSTFIX_CLIENT_INFO}): %{DATA:postfix_tls_version} with cipher %{DATA:postfix_tls_cipher} \(%{DATA:postfix_tls_cipher_size} bits\) +POSTFIX_DELAYS %{NUMBER:postfix_delay_before_qmgr}/%{NUMBER:postfix_delay_in_qmgr}/%{NUMBER:postfix_delay_conn_setup}/%{NUMBER:postfix_delay_transmission} +POSTFIX_LOSTCONN (lost connection|timeout|SSL_accept error) +POSTFIX_LOSTCONN_REASONS (receiving the initial server greeting|sending message body|sending end of data -- message may be sent more than once) +POSTFIX_PROXY_MESSAGE (%{POSTFIX_STATUS_CODE:postfix_proxy_status_code} )?(%{POSTFIX_STATUS_CODE_ENHANCED:postfix_proxy_status_code_enhanced})?.* + +# helper patterns +GREEDYDATA_NO_COLON [^:]* +GREEDYDATA_NO_SEMICOLON [^;]* + +# warning patterns +POSTFIX_WARNING_WITH_KV (%{POSTFIX_QUEUEID:postfix_queueid}: )?%{POSTFIX_WARNING_LEVEL:postfix_message_level}: %{GREEDYDATA:postfix_message}; %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} +POSTFIX_WARNING_WITHOUT_KV (%{POSTFIX_QUEUEID:postfix_queueid}: )?%{POSTFIX_WARNING_LEVEL:postfix_message_level}: %{GREEDYDATA:postfix_message} +POSTFIX_WARNING %{POSTFIX_WARNING_WITH_KV}|%{POSTFIX_WARNING_WITHOUT_KV} + +# smtpd patterns +POSTFIX_SMTPD_CONNECT connect from %{POSTFIX_CLIENT_INFO} +POSTFIX_SMTPD_DISCONNECT disconnect from %{POSTFIX_CLIENT_INFO} +POSTFIX_SMTPD_LOSTCONN %{POSTFIX_LOSTCONN:postfix_smtpd_lostconn_data}( after %{POSTFIX_SMTP_STAGE:postfix_smtp_stage}( \(%{INT} bytes\))?)? from %{POSTFIX_CLIENT_INFO}(: %{GREEDYDATA:postfix_smtpd_lostconn_reason})? +POSTFIX_SMTPD_NOQUEUE NOQUEUE: %{POSTFIX_ACTION:postfix_action}: %{POSTFIX_SMTP_STAGE:postfix_smtp_stage} from %{POSTFIX_CLIENT_INFO}:( %{POSTFIX_STATUS_CODE:postfix_status_code} %{POSTFIX_STATUS_CODE_ENHANCED:postfix_status_code_enhanced})?( <%{DATA:postfix_status_data}>:)? (%{POSTFIX_DNSBL_MESSAGE}|%{GREEDYDATA:postfix_status_message};) %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} +POSTFIX_SMTPD_PIPELINING improper command pipelining after %{POSTFIX_SMTP_STAGE:postfix_smtp_stage} from %{POSTFIX_CLIENT_INFO}: %{GREEDYDATA:postfix_improper_pipelining_data} +POSTFIX_SMTPD_PROXY proxy-%{POSTFIX_ACTION:postfix_proxy_result}: (%{POSTFIX_SMTP_STAGE:postfix_proxy_smtp_stage}): %{POSTFIX_PROXY_MESSAGE:postfix_proxy_message}; %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} + +# cleanup patterns +POSTFIX_CLEANUP_MILTER %{POSTFIX_QUEUEID:postfix_queueid}: milter-%{POSTFIX_ACTION:postfix_milter_result}: %{GREEDYDATA:postfix_milter_message}; %{GREEDYDATA_NO_COLON:postfix_keyvalue_data}(: %{GREEDYDATA:postfix_milter_data})? + +# qmgr patterns +POSTFIX_QMGR_REMOVED %{POSTFIX_QUEUEID:postfix_queueid}: removed +POSTFIX_QMGR_ACTIVE %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} \(queue active\) +POSTFIX_QMGR_EXPIRED %{POSTFIX_QUEUEID:postfix_queueid}: from=<%{DATA:postfix_from}>, status=%{WORD:postfix_status}, returned to sender + +# pipe patterns +POSTFIX_PIPE_ANY %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data}, status=%{WORD:postfix_status} \(%{GREEDYDATA:postfix_pipe_response}\) + +# error patterns +POSTFIX_ERROR_ANY %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data}, status=%{WORD:postfix_status} \(%{GREEDYDATA:postfix_error_response}\) + +# discard patterns +POSTFIX_DISCARD_ANY %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_KEYVALUE_DATA:postfix_keyvalue_data} status=%{WORD:postfix_status} %{GREEDYDATA} + +# postsuper patterns +POSTFIX_POSTSUPER_ACTIONS (removed|requeued|placed on hold|released from hold) +POSTFIX_POSTSUPER_ACTION %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_POSTSUPER_ACTIONS:postfix_postsuper_action} +POSTFIX_POSTSUPER_SUMMARY_ACTIONS (Deleted|Requeued|Placed on hold|Released from hold) +POSTFIX_POSTSUPER_SUMMARY %{POSTFIX_POSTSUPER_SUMMARY_ACTIONS:postfix_postsuper_summary_action}: %{NUMBER:postfix_postsuper_summary_count} messages? + +# postscreen patterns +POSTFIX_PS_CONNECT CONNECT from %{POSTFIX_CLIENT_INFO} to \[%{IP:postfix_server_ip}\]:%{INT:postfix_server_port} +POSTFIX_PS_ACCESS %{POSTFIX_PS_ACCESS_ACTION:postfix_postscreen_access} %{POSTFIX_CLIENT_INFO} +POSTFIX_PS_NOQUEUE %{POSTFIX_SMTPD_NOQUEUE} +POSTFIX_PS_TOOBUSY NOQUEUE: reject: CONNECT from %{POSTFIX_CLIENT_INFO}: %{GREEDYDATA:postfix_postscreen_toobusy_data} +POSTFIX_PS_DNSBL %{POSTFIX_PS_VIOLATION:postfix_postscreen_violation} rank %{INT:postfix_postscreen_dnsbl_rank} for %{POSTFIX_CLIENT_INFO} +POSTFIX_PS_CACHE cache %{DATA} full cleanup: retained=%{NUMBER:postfix_postscreen_cache_retained} dropped=%{NUMBER:postfix_postscreen_cache_dropped} entries +POSTFIX_PS_VIOLATIONS %{POSTFIX_PS_VIOLATION:postfix_postscreen_violation}( %{INT})?( after %{NUMBER:postfix_postscreen_violation_time})? from %{POSTFIX_CLIENT_INFO}(( after %{POSTFIX_SMTP_STAGE:postfix_smtp_stage})?(: %{GREEDYDATA:postfix_postscreen_data})?| in tests (after|before) SMTP handshake) + +# dnsblog patterns +POSTFIX_DNSBLOG_LISTING addr %{IP:postfix_client_ip} listed by domain %{HOSTNAME:postfix_dnsbl_domain} as %{IP:postfix_dnsbl_result} + +# tlsproxy patterns +POSTFIX_TLSPROXY_CONN (DIS)?CONNECT( from)? %{POSTFIX_CLIENT_INFO} + +# anvil patterns +POSTFIX_ANVIL_CONN_RATE statistics: max connection rate %{NUMBER:postfix_anvil_conn_rate}/%{POSTFIX_TIME_UNIT:postfix_anvil_conn_period} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp} +POSTFIX_ANVIL_CONN_CACHE statistics: max cache size %{NUMBER:postfix_anvil_cache_size} at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp} +POSTFIX_ANVIL_CONN_COUNT statistics: max connection count %{NUMBER:postfix_anvil_conn_count} for \(%{DATA:postfix_service}:%{IP:postfix_client_ip}\) at %{SYSLOGTIMESTAMP:postfix_anvil_timestamp} + +# smtp patterns +POSTFIX_SMTP_DELIVERY %{POSTFIX_KEYVALUE} status=%{WORD:postfix_status}( \(%{GREEDYDATA:postfix_smtp_response}\))? +POSTFIX_SMTP_CONNERR connect to %{POSTFIX_RELAY_INFO}: (Connection timed out|No route to host|Connection refused|Network is unreachable) +POSTFIX_SMTP_LOSTCONN %{POSTFIX_QUEUEID:postfix_queueid}: %{POSTFIX_LOSTCONN:postfix_smtp_lostconn_data} with %{POSTFIX_RELAY_INFO}( while %{POSTFIX_LOSTCONN_REASONS:postfix_smtp_lostconn_reason})? +POSTFIX_SMTP_TIMEOUT %{POSTFIX_QUEUEID:postfix_queueid}: conversation with %{POSTFIX_RELAY_INFO} timed out( while %{POSTFIX_LOSTCONN_REASONS:postfix_smtp_lostconn_reason})? +POSTFIX_SMTP_RELAYERR %{POSTFIX_QUEUEID:postfix_queueid}: host %{POSTFIX_RELAY_INFO} said: %{GREEDYDATA:postfix_smtp_response} \(in reply to %{POSTFIX_SMTP_STAGE:postfix_smtp_stage} command\) + +# master patterns +POSTFIX_MASTER_START (daemon started|reload) -- version %{DATA:postfix_version}, configuration %{PATH:postfix_config_path} +POSTFIX_MASTER_EXIT terminating on signal %{INT:postfix_termination_signal} + +# bounce patterns +POSTFIX_BOUNCE_NOTIFICATION %{POSTFIX_QUEUEID:postfix_queueid}: sender (non-delivery|delivery status|delay) notification: %{POSTFIX_QUEUEID:postfix_bounce_queueid} + +# scache patterns +POSTFIX_SCACHE_LOOKUPS statistics: (address|domain) lookup hits=%{INT:postfix_scache_hits} miss=%{INT:postfix_scache_miss} success=%{INT:postfix_scache_success}% +POSTFIX_SCACHE_SIMULTANEOUS statistics: max simultaneous domains=%{INT:postfix_scache_domains} addresses=%{INT:postfix_scache_addresses} connection=%{INT:postfix_scache_connection} +POSTFIX_SCACHE_TIMESTAMP statistics: start interval %{SYSLOGTIMESTAMP:postfix_scache_timestamp} + +# aggregate all patterns +POSTFIX_SMTPD %{POSTFIX_SMTPD_CONNECT}|%{POSTFIX_SMTPD_DISCONNECT}|%{POSTFIX_SMTPD_LOSTCONN}|%{POSTFIX_SMTPD_NOQUEUE}|%{POSTFIX_SMTPD_PIPELINING}|%{POSTFIX_TLSCONN}|%{POSTFIX_WARNING}|%{POSTFIX_SMTPD_PROXY}|%{POSTFIX_KEYVALUE} +POSTFIX_CLEANUP %{POSTFIX_CLEANUP_MILTER}|%{POSTFIX_WARNING}|%{POSTFIX_KEYVALUE} +POSTFIX_QMGR %{POSTFIX_QMGR_REMOVED}|%{POSTFIX_QMGR_ACTIVE}|%{POSTFIX_QMGR_EXPIRED}|%{POSTFIX_WARNING} +POSTFIX_PIPE %{POSTFIX_PIPE_ANY} +POSTFIX_POSTSCREEN %{POSTFIX_PS_CONNECT}|%{POSTFIX_PS_ACCESS}|%{POSTFIX_PS_NOQUEUE}|%{POSTFIX_PS_TOOBUSY}|%{POSTFIX_PS_CACHE}|%{POSTFIX_PS_DNSBL}|%{POSTFIX_PS_VIOLATIONS}|%{POSTFIX_WARNING} +POSTFIX_DNSBLOG %{POSTFIX_DNSBLOG_LISTING}|%{POSTFIX_WARNING} +POSTFIX_ANVIL %{POSTFIX_ANVIL_CONN_RATE}|%{POSTFIX_ANVIL_CONN_CACHE}|%{POSTFIX_ANVIL_CONN_COUNT} +POSTFIX_SMTP %{POSTFIX_SMTP_DELIVERY}|%{POSTFIX_SMTP_CONNERR}|%{POSTFIX_SMTP_LOSTCONN}|%{POSTFIX_SMTP_TIMEOUT}|%{POSTFIX_SMTP_RELAYERR}|%{POSTFIX_TLSCONN}|%{POSTFIX_WARNING} +POSTFIX_DISCARD %{POSTFIX_DISCARD_ANY}|%{POSTFIX_WARNING} +POSTFIX_LMTP %{POSTFIX_SMTP} +POSTFIX_PICKUP %{POSTFIX_KEYVALUE} +POSTFIX_TLSPROXY %{POSTFIX_TLSPROXY_CONN}|%{POSTFIX_WARNING} +POSTFIX_MASTER %{POSTFIX_MASTER_START}|%{POSTFIX_MASTER_EXIT}|%{POSTFIX_WARNING} +POSTFIX_BOUNCE %{POSTFIX_BOUNCE_NOTIFICATION} +POSTFIX_SENDMAIL %{POSTFIX_WARNING} +POSTFIX_POSTDROP %{POSTFIX_WARNING} +POSTFIX_SCACHE %{POSTFIX_SCACHE_LOOKUPS}|%{POSTFIX_SCACHE_SIMULTANEOUS}|%{POSTFIX_SCACHE_TIMESTAMP} +POSTFIX_TRIVIAL_REWRITE %{POSTFIX_WARNING} +POSTFIX_TLSMGR %{POSTFIX_WARNING} +POSTFIX_LOCAL %{POSTFIX_KEYVALUE} +POSTFIX_VIRTUAL %{POSTFIX_SMTP_DELIVERY} +POSTFIX_ERROR %{POSTFIX_ERROR_ANY} +POSTFIX_POSTSUPER %{POSTFIX_POSTSUPER_ACTION}|%{POSTFIX_POSTSUPER_SUMMARY} diff --git a/ppl-spark-integration/src/main/resources/patterns/ruby b/ppl-spark-integration/src/main/resources/patterns/ruby new file mode 100644 index 000000000..9e5aa5b3c --- /dev/null +++ b/ppl-spark-integration/src/main/resources/patterns/ruby @@ -0,0 +1,3 @@ +# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns +RUBY_LOGLEVEL (?:DEBUG|FATAL|ERROR|WARN|INFO) +RUBY_LOGGER [DFEWI], \[%{TIMESTAMP_ISO8601:timestamp} #%{POSINT:pid}\] *%{RUBY_LOGLEVEL:loglevel} -- +%{DATA:progname}: %{GREEDYDATA:message} diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGrokTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGrokTranslatorTestSuite.scala new file mode 100644 index 000000000..5094b0035 --- /dev/null +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanGrokTranslatorTestSuite.scala @@ -0,0 +1,286 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.ppl + +import java.util +import java.util.Map + +import org.opensearch.flint.spark.ppl.PlaneUtils.plan +import org.opensearch.sql.common.grok.{Grok, GrokCompiler, Match} +import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Coalesce, Descending, GreaterThan, Literal, NullsFirst, NullsLast, RegExpExtract, SortOrder} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class PPLLogicalPlanGrokTranslatorTestSuite + extends SparkFunSuite + with PlanTest + with LogicalPlanTestUtils + with Matchers { + + private val planTransformer = new CatalystQueryPlanVisitor() + private val pplParser = new PPLSyntaxParser() + + test("test grok email & host expressions") { + val grokCompiler = GrokCompiler.newInstance + grokCompiler.registerDefaultPatterns() + + /* Grok pattern to compile, here httpd logs */ /* Grok pattern to compile, here httpd logs */ + val grok = grokCompiler.compile(".+@%{HOSTNAME:host}") + + /* Line of log to match */ /* Line of log to match */ + val log = "iii@gmail.com" + + val gm = grok.`match`(log) + val capture: util.Map[String, AnyRef] = gm.capture + + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=accounts | grok email '.+@%{HOSTNAME:host}' | fields email, host", + isExplain = false), + context) + + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal("1")), + "host")() + val expectedPlan = Project( + Seq(emailAttribute, hostAttribute), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("accounts")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test grok to grok raw logs.") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | grok message '%{COMMONAPACHELOG}' | fields COMMONAPACHELOG, timestamp, response, bytes", + false), + context) + + val messageAttribute = UnresolvedAttribute("message") + val logAttribute = UnresolvedAttribute("COMMONAPACHELOG") + val timestampAttribute = UnresolvedAttribute("timestamp") + val responseAttribute = UnresolvedAttribute("response") + val bytesAttribute = UnresolvedAttribute("bytes") + // scalastyle:off + val expectedRegExp = + "(?(?(?:(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))|(?(?:(?((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)(\\.(25[0-5]|2[0-4]\\d|1\\d\\d|[1-9]?\\d)){3}))|:)))(%.+)?)|(?(?(?[a-zA-Z0-9._-]+)) (?(?[a-zA-Z0-9._-]+)) \\[(?(?(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]))/(?\\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\\b)/(?(?>\\d\\d){1,2}):(?(?!<[0-9])(?(?:2[0123]|[01]?[0-9])):(?(?:[0-5][0-9]))(?::(?(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)))(?![0-9])) (?(?:[+-]?(?:[0-9]+))))\\] \"(?:(?\\b\\w+\\b) (?\\S+)(?: HTTP/(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))))?|(?.*?))\" (?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?:(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+))))))|-))" + // scalastyle:on + + val COMMONAPACHELOG = Alias( + RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("1")), + "COMMONAPACHELOG")() + val timestamp = Alias( + RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("11")), + "timestamp")() + val response = + Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("25")), "response")() + val bytes = + Alias(RegExpExtract(messageAttribute, Literal(expectedRegExp), Literal("27")), "bytes")() + val expectedPlan = Project( + Seq(logAttribute, timestampAttribute, responseAttribute, bytesAttribute), + Project( + Seq(messageAttribute, COMMONAPACHELOG, timestamp, response, bytes, UnresolvedStar(None)), + UnresolvedRelation(Seq("t")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test grok email expression with filter by age and sort by age field") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=accounts | grok email '.+@%{HOSTNAME:host}' | where age > 45 | sort - age | fields age, email, host", + isExplain = false), + context) + + // Define the expected logical plan + val emailAttribute = UnresolvedAttribute("email") + val ageAttribute = UnresolvedAttribute("age") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + // Define the corrected expected plan + val expectedPlan = Project( + Seq(ageAttribute, emailAttribute, UnresolvedAttribute("host")), + Sort( + Seq(SortOrder(ageAttribute, Descending, NullsLast, Seq.empty)), + global = true, + Filter( + GreaterThan(ageAttribute, Literal(45)), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("accounts")))))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test grok email expression, generate new host field and eval result") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=accounts | grok email '.+@%{HOSTNAME:host}' | eval eval_result=1 | fields host, eval_result", + false), + context) + + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val evalResultAttribute = UnresolvedAttribute("eval_result") + + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal("1")), + "host")() + + val evalResultExpression = Alias(Literal(1), "eval_result")() + + val expectedPlan = Project( + Seq(hostAttribute, evalResultAttribute), + Project( + Seq(UnresolvedStar(None), evalResultExpression), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("accounts"))))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } + + test("test grok email expressions and group by count host ") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=t | grok email '.+@%{HOSTNAME:host}' | stats count() by host", + false), + context) + + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + // Define the corrected expected plan + val expectedPlan = Project( + Seq(UnresolvedStar(None)), // Matches the '*' in the Project + Aggregate( + Seq(Alias(hostAttribute, "host")()), // Group by 'host' + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(UnresolvedStar(None)), isDistinct = false), + "count()")(), + Alias(hostAttribute, "host")()), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("t"))))) + + // Compare the logical plans + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test grok email expressions and top count_host ") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan(pplParser, "source=t | grok email '.+@%{HOSTNAME:host}' | top 1 host", false), + context) + + val emailAttribute = UnresolvedAttribute("email") + val hostAttribute = UnresolvedAttribute("host") + val hostExpression = Alias( + RegExpExtract( + emailAttribute, + Literal( + ".+@(?\\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\\.?|\\b))"), + Literal(1)), + "host")() + + val sortedPlan = Sort( + Seq( + SortOrder( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(hostAttribute), isDistinct = false), + "count_host")(), + Descending, + NullsLast, + Seq.empty)), + global = true, + Aggregate( + Seq(hostAttribute), + Seq( + Alias( + UnresolvedFunction(Seq("COUNT"), Seq(hostAttribute), isDistinct = false), + "count_host")(), + hostAttribute), + Project( + Seq(emailAttribute, hostExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("t"))))) + // Define the corrected expected plan + val expectedPlan = Project( + Seq(UnresolvedStar(None)), // Matches the '*' in the Project + GlobalLimit(Literal(1), LocalLimit(Literal(1), sortedPlan))) + // Compare the logical plans + comparePlans(expectedPlan, logPlan, checkAnalysis = false) + } + + test("test grok address expressions with 2 fields identifies ") { + val context = new CatalystPlanContext + val logPlan = + planTransformer.visit( + plan( + pplParser, + "source=accounts | grok street_address '%{NUMBER} %{GREEDYDATA:address}' | fields address ", + false), + context) + + val street_addressAttribute = UnresolvedAttribute("street_address") + val addressAttribute = UnresolvedAttribute("address") + val addressExpression = Alias( + RegExpExtract( + street_addressAttribute, + Literal( + "(?(?:(?(?[+-]?(?:(?:[0-9]+(?:\\.[0-9]+)?)|(?:\\.[0-9]+)))))) (?.*)"), + Literal("3")), + "address")() + val expectedPlan = Project( + Seq(addressAttribute), + Project( + Seq(street_addressAttribute, addressExpression, UnresolvedStar(None)), + UnresolvedRelation(Seq("accounts")))) + assert(compareByString(expectedPlan) === compareByString(logPlan)) + } +} diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseTranslatorTestSuite.scala index cfc3d9725..36fea03f9 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanParseTranslatorTestSuite.scala @@ -38,9 +38,8 @@ class PPLLogicalPlanParseTranslatorTestSuite val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal("1")))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal("1")), "host")() val expectedPlan = Project( Seq(emailAttribute, hostAttribute), Project( @@ -57,9 +56,8 @@ class PPLLogicalPlanParseTranslatorTestSuite context) val emailAttribute = UnresolvedAttribute("email") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal("1")))), - "email")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal("1")), "email")() val expectedPlan = Project( Seq(emailAttribute), Project( @@ -81,9 +79,8 @@ class PPLLogicalPlanParseTranslatorTestSuite // Define the expected logical plan val emailAttribute = UnresolvedAttribute("email") val ageAttribute = UnresolvedAttribute("age") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() // Define the corrected expected plan val expectedPlan = Project( @@ -113,9 +110,8 @@ class PPLLogicalPlanParseTranslatorTestSuite val hostAttribute = UnresolvedAttribute("host") val evalResultAttribute = UnresolvedAttribute("eval_result") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal("1")))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal("1")), "host")() val evalResultExpression = Alias(Literal(1), "eval_result")() @@ -144,11 +140,17 @@ class PPLLogicalPlanParseTranslatorTestSuite val streetAttribute = UnresolvedAttribute("street") val streetNumberExpression = Alias( - Coalesce(Seq(RegExpExtract(addressAttribute, Literal("(\\d+) (.+)"), Literal("1")))), + RegExpExtract( + addressAttribute, + Literal("(?\\d+) (?.+)"), + Literal("1")), "streetNumber")() val streetExpression = Alias( - Coalesce(Seq(RegExpExtract(addressAttribute, Literal("(\\d+) (.+)"), Literal("2")))), + RegExpExtract( + addressAttribute, + Literal("(?\\d+) (?.+)"), + Literal("2")), "street")() val expectedPlan = Project( @@ -174,9 +176,8 @@ class PPLLogicalPlanParseTranslatorTestSuite val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() // Define the corrected expected plan val expectedPlan = Project( @@ -205,9 +206,8 @@ class PPLLogicalPlanParseTranslatorTestSuite val emailAttribute = UnresolvedAttribute("email") val hostAttribute = UnresolvedAttribute("host") - val hostExpression = Alias( - Coalesce(Seq(RegExpExtract(emailAttribute, Literal(".+@(.+)"), Literal(1)))), - "host")() + val hostExpression = + Alias(RegExpExtract(emailAttribute, Literal(".+@(?.+)"), Literal(1)), "host")() val sortedPlan = Sort( Seq(