Skip to content

Commit

Permalink
update the flint client createIndex api
Browse files Browse the repository at this point in the history
add metadata creation parameters

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 18, 2023
1 parent d2677d0 commit 42530a8
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,30 @@ case class FlintMetadata(
* @return
* JSON content
*/
def getContent: String = {
def getContent(includeProperties: Boolean = true): String = {
try {
buildJson(builder => {
// Add _meta field
objectField(builder, "_meta") {
builder
.field("version", version.version)
.field("targetName", targetName.getOrElse(name) )
.field("name", name)
.field("kind", kind)
.field("source", source)
.field("indexedColumns", indexedColumns)
// Only add targetName if it's not empty
targetName.foreach(tn => builder.field("targetName", tn))
optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)

if (includeProperties) {
optionalObjectField(builder, "properties", properties)
}
}

// Add properties (schema) field
builder.field("properties", schema)
if (includeProperties) {
builder.field("properties", schema)
}
})
} catch {
case e: Exception =>
Expand Down Expand Up @@ -111,7 +117,7 @@ object FlintMetadata {
innerFieldName match {
case "version" => builder.version(FlintVersion.apply(parser.text()))
case "name" => builder.name(parser.text())
case "targetName" => builder.targetName(parser.text())
case "targetName" => builder.targetName(Option.apply(parser.text()))
case "kind" => builder.kind(parser.text())
case "source" => builder.source(parser.text())
case "indexedColumns" =>
Expand Down Expand Up @@ -144,8 +150,8 @@ object FlintMetadata {
*/
class Builder {
private var version: FlintVersion = FlintVersion.current()
private var name: String = ""
private var targetName: Option[String] = None
private var name: String = ""
private var kind: String = ""
private var source: String = ""
private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
Expand All @@ -164,8 +170,8 @@ object FlintMetadata {
this
}

def targetName(name: String): this.type = {
this.targetName = Option(name)
def targetName(name: Option[String]): this.type = {
this.targetName = name
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ public void createIndex(String indexName, FlintMetadata metadata) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(metadata.getContent(), XContentType.JSON);
metadata.targetName().exists(name -> {
return request.alias(new Alias(toLowercase(metadata.name())));
});
boolean includeMappingProperties = true;
if(metadata.targetName().nonEmpty()) {
request.alias(new Alias(toLowercase(metadata.name())));
includeMappingProperties = false;
}
request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON);
Option<String> settings = metadata.indexSettings();
if (settings.isDefined()) {
request.settings(settings.get(), XContentType.JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ class FlintMetadataSuite extends AnyFlatSpec with Matchers {
builder.schema("""{"properties": {"test_field": {"type": "os_type"}}}""")

val metadata = builder.build()
metadata.getContent should matchJson(testMetadataJson)
metadata.getContent() should matchJson(testMetadataJson)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class FlintTable(conf: util.Map[String, String], userSpecifiedSchema: Optio
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getIndexMetadata(name)
.getContent)
.getContent())
}
}
schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ class FlintSpark(val spark: SparkSession) {
if (targetName.nonEmpty) {
//use targetIndex as the index to store the acceleration data
flintClient.alias(targetName.get, indexName, index.metadata())
} else if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
flintClient.createIndex(indexName, metadata)
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
flintClient.createIndex(indexName, metadata)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ case class FlintSparkCoveringIndex(

metadataBuilder(this)
.name(indexName)
.targetName(targetIndexName)
.source(tableName)
.indexedColumns(indexColumnMaps)
.schema(schemaJson)
Expand Down Expand Up @@ -104,7 +105,7 @@ object FlintSparkCoveringIndex {

/** Builder class for covering index build */
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var targetIndexName: String = ""
private var targetIndexName: Option[String] = None
private var indexName: String = ""
private var indexedColumns: Map[String, String] = Map()

Expand All @@ -130,7 +131,7 @@ object FlintSparkCoveringIndex {
* index builder
*/
def targetName(indexName: String): Builder = {
this.targetIndexName = indexName
this.targetIndexName = Option.apply(indexName)
this
}

Expand Down Expand Up @@ -163,6 +164,6 @@ object FlintSparkCoveringIndex {
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions)
new FlintSparkCoveringIndex(targetIndexName, indexName, tableName, indexedColumns, indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ case class FlintSparkMaterializedView(

metadataBuilder(this)
.name(mvName)
.targetName(targetIndexName)
.source(query)
.indexedColumns(indexColumnMaps)
.schema(schemaJson)
Expand Down Expand Up @@ -159,7 +160,7 @@ object FlintSparkMaterializedView {

/** Builder class for MV build */
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var targetIndexName: String = ""
private var targetIndexName: Option[String] = None
private var mvName: String = ""
private var query: String = ""

Expand All @@ -172,7 +173,7 @@ object FlintSparkMaterializedView {
* index builder
*/
def targetName(indexName: String): Builder = {
this.targetIndexName = indexName
this.targetIndexName = Option.apply(indexName)
this
}

Expand Down Expand Up @@ -211,7 +212,7 @@ object FlintSparkMaterializedView {
field.name -> field.dataType.typeName
}
.toMap
FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions)
FlintSparkMaterializedView(targetIndexName, mvName, query, outputSchema, indexOptions)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {

test("get covering index name") {
val index =
new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string"))
new FlintSparkCoveringIndex(None, "ci", "spark_catalog.default.test", Map("name" -> "string"))
index.name() shouldBe "flint_spark_catalog_default_test_ci_index"
}

test("should fail if get index name without full table name") {
val index = new FlintSparkCoveringIndex("ci", "test", Map("name" -> "string"))
val index = new FlintSparkCoveringIndex(None, "ci", "test", Map("name" -> "string"))
assertThrows[IllegalArgumentException] {
index.name()
}
}

test("should fail if no indexed column given") {
assertThrows[IllegalArgumentException] {
new FlintSparkCoveringIndex("ci", "default.test", Map.empty)
new FlintSparkCoveringIndex(None, "ci", "default.test", Map.empty)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val testQuery = "SELECT 1"

test("get name") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty)
mv.name() shouldBe "flint_spark_catalog_default_mv"
}

test("should fail if get name with unqualified MV name") {
the[IllegalArgumentException] thrownBy
FlintSparkMaterializedView("mv", testQuery, Map.empty).name()
FlintSparkMaterializedView(None, "mv", testQuery, Map.empty).name()

the[IllegalArgumentException] thrownBy
FlintSparkMaterializedView("default.mv", testQuery, Map.empty).name()
FlintSparkMaterializedView(None, "default.mv", testQuery, Map.empty).name()
}

test("get metadata") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map("test_col" -> "integer"))
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map("test_col" -> "integer"))

val metadata = mv.metadata()
metadata.name shouldBe mv.mvName
Expand All @@ -64,7 +64,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val indexSettings = """{"number_of_shards": 2}"""
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "index_settings" -> indexSettings))
val mv = FlintSparkMaterializedView(
val mv = FlintSparkMaterializedView(None,
testMvName,
testQuery,
Map("test_col" -> "integer"),
Expand All @@ -77,12 +77,12 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
}

test("build batch data frame") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty)
mv.build(spark, None).collect() shouldBe Array(Row(1))
}

test("should fail if build given other source data frame") {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty)
the[IllegalArgumentException] thrownBy mv.build(spark, Some(mock[DataFrame]))
}

Expand All @@ -100,7 +100,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
| GROUP BY TUMBLE(time, '1 Minute')
|""".stripMargin

val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty)
val actualPlan = mv.buildStream(spark).queryExecution.logical
assert(
actualPlan.sameSemantics(
Expand All @@ -127,7 +127,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
| GROUP BY TUMBLE(time, '1 Minute')
|""".stripMargin

val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(None, testMvName, testQuery, Map.empty)
val actualPlan = mv.buildStream(spark).queryExecution.logical
assert(
actualPlan.sameSemantics(
Expand All @@ -145,7 +145,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")

val mv = FlintSparkMaterializedView(
val mv = FlintSparkMaterializedView(None,
testMvName,
s"SELECT name, age FROM $testTable WHERE age > 30",
Map.empty)
Expand All @@ -164,7 +164,7 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")

val mv = FlintSparkMaterializedView(
val mv = FlintSparkMaterializedView(None,
testMvName,
s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name",
Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
}

private def schemaShouldMatch(metadata: FlintMetadata, expected: String): Unit = {
val actual = parse(metadata.getContent) \ "properties"
val actual = parse(metadata.getContent()) \ "properties"
assert(actual == parse(expected))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
|""".stripMargin

val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn(content)
when(metadata.getContent()).thenReturn(content)
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex(indexName, metadata)

Expand All @@ -58,7 +58,7 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
val indexName = "flint_test_with_settings"
val indexSettings = "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.getContent()).thenReturn("{}")
when(metadata.indexSettings).thenReturn(Some(indexSettings))

flintClient.createIndex(indexName, metadata)
Expand All @@ -73,14 +73,14 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M

it should "get all index metadata with the given index name pattern" in {
val metadata = mock[FlintMetadata]
when(metadata.getContent).thenReturn("{}")
when(metadata.getContent()).thenReturn("{}")
when(metadata.indexSettings).thenReturn(None)
flintClient.createIndex("flint_test_1_index", metadata)
flintClient.createIndex("flint_test_2_index", metadata)

val allMetadata = flintClient.getAllIndexMetadata("flint_*_index")
allMetadata should have size 2
allMetadata.forEach(metadata => metadata.getContent should not be empty)
allMetadata.forEach(metadata => metadata.getContent() should not be empty)
allMetadata.forEach(metadata => metadata.indexSettings should not be empty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""{
index.get.metadata().getContent() should matchJson(s"""{
| "_meta": {
| "version": "${current()}",
| "name": "name_and_age",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
private val testTable = "spark_catalog.default.mv_test"
private val testMvName = "spark_catalog.default.mv_test_metrics"
private val testFlintIndex = getFlintIndexName(testMvName)
private val testTargetIndex = "existing_index"
private val testQuery =
s"""
| SELECT
Expand Down Expand Up @@ -53,7 +54,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

val index = flint.describeIndex(testFlintIndex)
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""
index.get.metadata().getContent() should matchJson(s"""
| {
| "_meta": {
| "version": "${current()}",
Expand Down Expand Up @@ -87,25 +88,26 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
|""".stripMargin)
}

test("create materialized view using existing OpebnSearch index successfully") {
test("create materialized view using existing OpenSearch index successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
flint
.materializedView()
.targetName("existing_index")
.targetName(testTargetIndex)
.name(testMvName)
.query(testQuery)
.options(indexOptions)
.create()

val index = flint.describeIndex("existing_index")
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""
index.get.metadata().getContent() should matchJson(s"""
| {
| "_meta": {
| "version": "${current()}",
| "name": "spark_catalog.default.mv_test_metrics",
| "kind": "mv",
| "targetName": "$testTargetIndex",
| "source": "$testQuery",
| "indexedColumns": [
| {
Expand Down
Loading

0 comments on commit 42530a8

Please sign in to comment.