Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into issues/887
Browse files Browse the repository at this point in the history
  • Loading branch information
LantaoJin committed Nov 13, 2024
2 parents c8a9c63 + dd9c0cf commit 9b0ebac
Show file tree
Hide file tree
Showing 32 changed files with 1,858 additions and 96 deletions.
31 changes: 31 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
import Dependencies._
import sbtassembly.AssemblyPlugin.autoImport.ShadeRule

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.5.1"
Expand Down Expand Up @@ -43,7 +44,35 @@ lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")
// Run as part of test task.
lazy val testScalastyle = taskKey[Unit]("testScalastyle")

// Explanation:
// - ThisBuild / assemblyShadeRules sets the shading rules for the entire build
// - ShadeRule.rename(...) creates a rule to rename multiple package patterns
// - "shaded.@0" means prepend "shaded." to the original package name
// - .inAll applies the rule to all dependencies, not just direct dependencies
val packagesToShade = Seq(
"com.amazonaws.cloudwatch.**",
"com.fasterxml.jackson.core.**",
"com.fasterxml.jackson.dataformat.**",
"com.fasterxml.jackson.databind.**",
"com.sun.jna.**",
"com.thoughtworks.paranamer.**",
"javax.annotation.**",
"org.apache.commons.codec.**",
"org.apache.commons.logging.**",
"org.apache.hc.**",
"org.apache.http.**",
"org.glassfish.json.**",
"org.joda.time.**",
"org.reactivestreams.**",
"org.yaml.**",
"software.amazon.**"
)

ThisBuild / assemblyShadeRules := Seq(
ShadeRule.rename(
packagesToShade.map(_ -> "shaded.flint.@0"): _*
).inAll
)

lazy val commonSettings = Seq(
javacOptions ++= Seq("-source", "11"),
Expand Down Expand Up @@ -89,6 +118,8 @@ lazy val flintCore = (project in file("flint-core"))
"com.amazonaws" % "aws-java-sdk-cloudwatch" % "1.12.593"
exclude("com.fasterxml.jackson.core", "jackson-databind"),
"software.amazon.awssdk" % "auth-crt" % "2.28.10",
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"org.projectlombok" % "lombok" % "1.18.30" % "provided",
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
Expand Down
5 changes: 5 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ source = table | where ispresent(a) |
- `source = table | stats max(c) by b`
- `source = table | stats count(c) by b | head 5`
- `source = table | stats distinct_count(c)`
- `source = table | stats distinct_count_approx(c)`
- `source = table | stats stddev_samp(c)`
- `source = table | stats stddev_pop(c)`
- `source = table | stats percentile(c, 90)`
Expand All @@ -206,6 +207,7 @@ source = table | where ispresent(a) |
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
Expand Down Expand Up @@ -250,12 +252,15 @@ source = table | where ispresent(a) |

- `source=accounts | rare gender`
- `source=accounts | rare age by gender`
- `source=accounts | rare 5 age by gender`
- `source=accounts | rare_approx age by gender`

#### **Top**
[See additional command details](ppl-top-command.md)

- `source=accounts | top gender`
- `source=accounts | top 1 gender`
- `source=accounts | top_approx 5 gender`
- `source=accounts | top 1 age by gender`

#### **Parse**
Expand Down
10 changes: 8 additions & 2 deletions docs/ppl-lang/ppl-rare-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ Using ``rare`` command to find the least common tuple of values of all fields in
**Note**: A maximum of 10 results is returned for each distinct tuple of values of the group-by fields.

**Syntax**
`rare <field-list> [by-clause]`
`rare [N] <field-list> [by-clause]`
`rare_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.
* rare_approx: approximate count of the rare (n) fields by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).


### Example 1: Find the least common values in a field
Expand All @@ -19,6 +22,8 @@ The example finds least common gender of all the accounts.
PPL query:

os> source=accounts | rare gender;
os> source=accounts | rare_approx 10 gender;
os> source=accounts | rare_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -34,7 +39,8 @@ The example finds least common age of all the accounts group by gender.

PPL query:

os> source=accounts | rare age by gender;
os> source=accounts | rare 5 age by gender;
os> source=accounts | rare_approx 5 age by gender;
fetched rows / total rows = 4/4
+----------+-------+
| gender | age |
Expand Down
7 changes: 5 additions & 2 deletions docs/ppl-lang/ppl-top-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ Using ``top`` command to find the most common tuple of values of all fields in t

### Syntax
`top [N] <field-list> [by-clause]`
`top_approx [N] <field-list> [by-clause]`

* N: number of results to return. **Default**: 10
* field-list: mandatory. comma-delimited list of field names.
* by-clause: optional. one or more fields to group the results by.

* top_approx: approximate count of the (n) top fields by using estimated [cardinality by HyperLogLog++ algorithm](https://spark.apache.org/docs/3.5.2/sql-ref-functions-builtin.html).

### Example 1: Find the most common values in a field

Expand All @@ -19,6 +20,7 @@ The example finds most common gender of all the accounts.
PPL query:

os> source=accounts | top gender;
os> source=accounts | top_approx gender;
fetched rows / total rows = 2/2
+----------+
| gender |
Expand All @@ -33,7 +35,7 @@ The example finds most common gender of all the accounts.

PPL query:

os> source=accounts | top 1 gender;
os> source=accounts | top_approx 1 gender;
fetched rows / total rows = 1/1
+----------+
| gender |
Expand All @@ -48,6 +50,7 @@ The example finds most common age of all the accounts group by gender.
PPL query:

os> source=accounts | top 1 age by gender;
os> source=accounts | top_approx 1 age by gender;
fetched rows / total rows = 2/2
+----------+-------+
| gender | age |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
Expand Down Expand Up @@ -141,9 +141,9 @@ object FlintSparkIndexFactory extends Logging {
}

private def getMvSourceTables(spark: SparkSession, metadata: FlintMetadata): Array[String] = {
val sourceTables = getArrayString(metadata.properties, "sourceTables")
val sourceTables = getSourceTablesFromMetadata(metadata)
if (sourceTables.isEmpty) {
FlintSparkMaterializedView.extractSourceTableNames(spark, metadata.source)
FlintSparkMaterializedView.extractSourceTablesFromQuery(spark, metadata.source)
} else {
sourceTables
}
Expand All @@ -161,12 +161,4 @@ object FlintSparkIndexFactory extends Logging {
Some(value.asInstanceOf[String])
}
}

private def getArrayString(map: java.util.Map[String, AnyRef], key: String): Array[String] = {
map.get(key) match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
import org.opensearch.flint.common.metadata.FlintMetadata
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.FlintSparkIndexOptions
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getSourceTablesFromMetadata, MV_INDEX_TYPE}
import org.opensearch.flint.spark.scheduler.util.IntervalSchedulerParser

/**
Expand Down Expand Up @@ -61,12 +61,7 @@ object FlintMetadataCache {
None
}
val sourceTables = metadata.kind match {
case MV_INDEX_TYPE =>
metadata.properties.get("sourceTables") match {
case list: java.util.ArrayList[_] =>
list.toArray.map(_.toString)
case _ => Array.empty[String]
}
case MV_INDEX_TYPE => getSourceTablesFromMetadata(metadata)
case _ => Array(metadata.source)
}
val lastRefreshTime: Option[Long] = metadata.latestLogEntry.flatMap { entry =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ class FlintOpenSearchMetadataCacheWriter(options: FlintOptions)
.isInstanceOf[FlintOpenSearchIndexMetadataService]

override def updateMetadataCache(indexName: String, metadata: FlintMetadata): Unit = {
logInfo(s"Updating metadata cache for $indexName");
logInfo(s"Updating metadata cache for $indexName with $metadata");
val osIndexName = OpenSearchClientUtils.sanitizeIndexName(indexName)
var client: IRestHighLevelClient = null
try {
client = OpenSearchClientUtils.createClient(options)
val request = new PutMappingRequest(osIndexName)
request.source(serialize(metadata), XContentType.JSON)
val serialized = serialize(metadata)
logInfo(s"Serialized: $serialized")
request.source(serialized, XContentType.JSON)
client.updateIndexMapping(request, RequestOptions.DEFAULT)
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.mv

import java.util.Locale

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.common.metadata.FlintMetadata
Expand All @@ -18,6 +18,7 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.function.TumbleFunction
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation}
Expand Down Expand Up @@ -64,10 +65,14 @@ case class FlintSparkMaterializedView(
}.toArray
val schema = generateSchema(outputSchema).asJava

// Convert Scala Array to Java ArrayList for consistency with OpenSearch JSON parsing.
// OpenSearch uses Jackson, which deserializes JSON arrays into ArrayLists.
val sourceTablesProperty = new java.util.ArrayList[String](sourceTables.toSeq.asJava)

metadataBuilder(this)
.name(mvName)
.source(query)
.addProperty("sourceTables", sourceTables)
.addProperty("sourceTables", sourceTablesProperty)
.indexedColumns(indexColumnMaps)
.schema(schema)
.build()
Expand Down Expand Up @@ -153,7 +158,7 @@ case class FlintSparkMaterializedView(
}
}

object FlintSparkMaterializedView {
object FlintSparkMaterializedView extends Logging {

/** MV index type name */
val MV_INDEX_TYPE = "mv"
Expand Down Expand Up @@ -185,13 +190,40 @@ object FlintSparkMaterializedView {
* @return
* source table names
*/
def extractSourceTableNames(spark: SparkSession, query: String): Array[String] = {
spark.sessionState.sqlParser
def extractSourceTablesFromQuery(spark: SparkSession, query: String): Array[String] = {
logInfo(s"Extracting source tables from query $query")
val sourceTables = spark.sessionState.sqlParser
.parsePlan(query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
.toArray
logInfo(s"Extracted tables: [${sourceTables.mkString(", ")}]")
sourceTables
}

/**
* Get source tables from Flint metadata properties field.
*
* @param metadata
* Flint metadata
* @return
* source table names
*/
def getSourceTablesFromMetadata(metadata: FlintMetadata): Array[String] = {
logInfo(s"Getting source tables from metadata $metadata")
val sourceTables = metadata.properties.get("sourceTables")
sourceTables match {
case list: java.util.ArrayList[_] =>
logInfo(s"sourceTables is [${list.asScala.mkString(", ")}]")
list.toArray.map(_.toString)
case null =>
logInfo("sourceTables property does not exist")
Array.empty[String]
case _ =>
logInfo(s"sourceTables has unexpected type: ${sourceTables.getClass.getName}")
Array.empty[String]
}
}

/** Builder class for MV build */
Expand Down Expand Up @@ -223,7 +255,7 @@ object FlintSparkMaterializedView {
*/
def query(query: String): Builder = {
this.query = query
this.sourceTables = extractSourceTableNames(flint.spark, query)
this.sourceTables = extractSourceTablesFromQuery(flint.spark, query)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
metadata.kind shouldBe MV_INDEX_TYPE
metadata.source shouldBe "SELECT 1"
metadata.properties should contain key "sourceTables"
metadata.properties.get("sourceTables").asInstanceOf[Array[String]] should have size 0
metadata.properties
.get("sourceTables")
.asInstanceOf[java.util.ArrayList[String]] should have size 0
metadata.indexedColumns shouldBe Array(
Map("columnName" -> "test_col", "columnType" -> "integer").asJava)
metadata.schema shouldBe Map("test_col" -> Map("type" -> "integer").asJava).asJava
Expand Down
Loading

0 comments on commit 9b0ebac

Please sign in to comment.