Skip to content

Commit

Permalink
Fix IT and UT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 11, 2023
1 parent c7f599d commit b9100f3
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ object FlintMetadata {
builder.schema(parser.map())
}
}

builder.indexSettings(settings)
builder.build()
} catch {
case e: Exception =>
Expand Down Expand Up @@ -114,6 +116,12 @@ object FlintMetadata {
}
}

def builder(): FlintMetadata.Builder = new Builder

/**
* Flint index metadata builder that can be extended by subclass to provide more custom build
* method.
*/
class Builder {
private var version: FlintVersion = FlintVersion.current()
private var name: String = ""
Expand All @@ -125,68 +133,67 @@ object FlintMetadata {
private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
private var indexSettings: String = null

// Setters for each field
def version(version: FlintVersion): Builder = {
def version(version: FlintVersion): this.type = {
this.version = version
this
}

def name(name: String): Builder = {
def name(name: String): this.type = {
this.name = name
this
}

def kind(kind: String): Builder = {
def kind(kind: String): this.type = {
this.kind = kind
this
}

def source(source: String): Builder = {
def source(source: String): this.type = {
this.source = source
this
}

def options(options: util.Map[String, AnyRef]): Builder = {
def options(options: util.Map[String, AnyRef]): this.type = {
this.options = options
this
}

def addOption(key: String, value: AnyRef): Builder = {
def addOption(key: String, value: AnyRef): this.type = {
this.options.put(key, value)
this
}

def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): Builder = {
def indexedColumns(indexedColumns: Array[util.Map[String, AnyRef]]): this.type = {
this.indexedColumns = indexedColumns
this
}

def addIndexedColumn(indexCol: util.Map[String, AnyRef]): Builder = {
def addIndexedColumn(indexCol: util.Map[String, AnyRef]): this.type = {
indexedColumns = indexedColumns :+ indexCol
this
}

def properties(properties: util.Map[String, AnyRef]): Builder = {
def properties(properties: util.Map[String, AnyRef]): this.type = {
this.properties = properties
this
}

def addProperty(key: String, value: AnyRef): Builder = {
def addProperty(key: String, value: AnyRef): this.type = {
properties.put(key, value)
this
}

def schema(schema: util.Map[String, AnyRef]): Builder = {
def schema(schema: util.Map[String, AnyRef]): this.type = {
this.schema = schema
this
}

def addSchemaField(key: String, value: AnyRef): Builder = {
def addSchemaField(key: String, value: AnyRef): this.type = {
schema.put(key, value)
this
}

def indexSettings(indexSettings: String): Builder = {
def indexSettings(indexSettings: String): this.type = {
this.indexSettings = indexSettings
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.json4s.{Formats, JArray, NoTypeHints}
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.native.JsonMethods.parse
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
Expand Down Expand Up @@ -87,7 +85,6 @@ class FlintSpark(val spark: SparkSession) {
}
} else {
val metadata = index.metadata()
index.options.indexSettings().foreach(metadata.setIndexSettings)
flintClient.createIndex(indexName, metadata)
}
}
Expand All @@ -105,7 +102,7 @@ class FlintSpark(val spark: SparkSession) {
def refreshIndex(indexName: String, mode: RefreshMode): Option[String] = {
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val tableName = getSourceTableName(index)
val tableName = index.metadata().source

// Write Flint index data to Flint data source (shared by both refresh modes for now)
def writeFlintIndex(df: DataFrame): Unit = {
Expand Down Expand Up @@ -224,39 +221,16 @@ class FlintSpark(val spark: SparkSession) {
}
}

// TODO: Remove all parsing logic below once Flint spec finalized and FlintMetadata strong typed
private def getSourceTableName(index: FlintSparkIndex): String = {
val json = parse(index.metadata().getContent)
(json \ "_meta" \ "source").extract[String]
}

/*
* For now, deserialize skipping strategies out of Flint metadata json
* ex. extract Seq(Partition("year", "int"), ValueList("name")) from
* { "_meta": { "indexedColumns": [ {...partition...}, {...value list...} ] } }
*
*/
private def deserialize(metadata: FlintMetadata): FlintSparkIndex = {
val meta = parse(metadata.getContent) \ "_meta"
val indexName = (meta \ "name").extract[String]
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]
val indexOptions = FlintSparkIndexOptions(
(meta \ "options")
.asInstanceOf[JObject]
.obj
.map { case JField(key, value) =>
key -> value.values.toString
}
.toMap)
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)

indexType match {
metadata.kind match {
case SKIPPING_INDEX_TYPE =>
val strategies = indexedColumns.arr.map { colInfo =>
val skippingKind = SkippingKind.withName((colInfo \ "kind").extract[String])
val columnName = (colInfo \ "columnName").extract[String]
val columnType = (colInfo \ "columnType").extract[String]
val strategies = metadata.indexedColumns.map { colInfo =>
val skippingKind = SkippingKind.withName(getString(colInfo, "kind"))
val columnName = getString(colInfo, "columnName")
val columnType = getString(colInfo, "columnType")

skippingKind match {
case PARTITION =>
Expand All @@ -269,17 +243,21 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
new FlintSparkSkippingIndex(tableName, strategies, indexOptions)
new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
new FlintSparkCoveringIndex(
indexName,
tableName,
indexedColumns.arr.map { obj =>
((obj \ "columnName").extract[String], (obj \ "columnType").extract[String])
metadata.name,
metadata.source,
metadata.indexedColumns.map { colInfo =>
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
indexOptions)
}
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
}

object FlintSpark {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.json4s.JObject
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.FlintSparkIndex.populateEnvToMetadata

import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType

/**
* Flint Spark metadata builder with common build logic.
*/
class FlintSparkIndexMetadataBuilder(index: FlintSparkIndex) extends FlintMetadata.Builder {

def schema(allFieldTypes: Map[String, String]): FlintSparkIndexMetadataBuilder = {
val catalogDDL =
allFieldTypes
.map { case (colName, colType) => s"$colName $colType not null" }
.mkString(",")
val struckType = StructType.fromDDL(catalogDDL)

// Assume each value is an JSON Object
struckType.fields.foreach(field => {
val (fieldName, fieldType) = FlintDataType.serializeField(field)
val fieldTypeMap =
fieldType
.asInstanceOf[JObject]
.values
.mapValues {
case v: Map[_, _] => v.asJava
case other => other
}
.asJava
addSchemaField(fieldName, fieldTypeMap)
})
this
}

override def build(): FlintMetadata = {
// Common fields in all Flint Spark index
kind(index.kind)
name(index.name())
options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava)

val envs = populateEnvToMetadata
if (envs.nonEmpty) {
addProperty("env", envs.asJava)
}

val settings = index.options.indexSettings()
if (settings.isDefined) {
indexSettings(settings.get)
}
super.build()
}
}

object FlintSparkIndexMetadataBuilder {

def builder(index: FlintSparkIndex): FlintSparkIndexMetadataBuilder =
new FlintSparkIndexMetadataBuilder(index)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@

package org.opensearch.flint.spark.covering

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata}
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType

/**
* Flint covering index in Spark.
Expand All @@ -38,62 +34,27 @@ case class FlintSparkCoveringIndex(

require(indexedColumns.nonEmpty, "indexed columns must not be empty")

/** Required by json4s write function */
implicit val formats: Formats = Serialization.formats(NoTypeHints)

override val kind: String = COVERING_INDEX_TYPE

override def name(): String = getFlintIndexName(indexName, tableName)

override def metadata(): FlintMetadata = {
new FlintMetadata(s"""{
| "_meta": {
| "name": "$indexName",
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName",
| "options": $getIndexOptions,
| "properties": $getIndexProperties
| },
| "properties": $getSchema
| }
|""".stripMargin)
val builder = FlintSparkIndexMetadataBuilder.builder(this)

indexedColumns.map { case (colName, colType) =>
builder.addIndexedColumn(
Map[String, AnyRef]("columnName" -> colName, "columnType" -> colType).asJava)
}
builder
.source(tableName)
.schema(indexedColumns)
.build()
}

override def build(df: DataFrame): DataFrame = {
val colNames = indexedColumns.keys.toSeq
df.select(colNames.head, colNames.tail: _*)
}

// TODO: refactor all these once Flint metadata spec finalized
private def getMetaInfo: String = {
val objects = indexedColumns.map { case (colName, colType) =>
JObject("columnName" -> JString(colName), "columnType" -> JString(colType))
}.toList
Serialization.write(JArray(objects))
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getIndexProperties: String = {
val envMap = populateEnvToMetadata
if (envMap.isEmpty) {
"{}"
} else {
s"""{ "env": ${Serialization.write(envMap)} }"""
}
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
.map { case (colName, colType) => s"$colName $colType not null" }
.mkString(",")
val properties = FlintDataType.serialize(StructType.fromDDL(catalogDDL))
compact(render(parse(properties) \ "properties"))
}
}

object FlintSparkCoveringIndex {
Expand Down
Loading

0 comments on commit b9100f3

Please sign in to comment.