Skip to content

Commit

Permalink
Merge branch 'main' into configure-value-set-limit-in-ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Jan 19, 2024
2 parents 171a844 + 6e163e7 commit ea2fa4b
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 30 deletions.
1 change: 0 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ jobs:
with:
github_token: ${{ steps.github_app_token.outputs.token }}
head_template: backport/backport-<%= number %>-to-<%= base %>
files_to_skip: 'CHANGELOG.md'
1 change: 1 addition & 0 deletions .github/workflows/snapshot-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- 0.*

jobs:
build-and-publish-snapshots:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient {
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());

/**
* Invalid index name characters to percent-encode,
* excluding '*' because it's reserved for pattern matching.
*/
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

/**
* Metadata log index name prefix
*/
Expand Down Expand Up @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {

protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
Expand All @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
@Override
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
Expand All @@ -148,7 +156,7 @@ public boolean exists(String indexName) {
@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = toLowercase(indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -166,7 +174,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
@Override
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

Expand Down Expand Up @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
toLowercase(indexName),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
} catch (IOException e) {
Expand All @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) {

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
}

@Override
Expand Down Expand Up @@ -287,4 +295,31 @@ private String toLowercase(String indexName) {

return indexName.toLowerCase(Locale.ROOT);
}

/*
* Percent-encode invalid OpenSearch index name characters.
*/
private String percentEncode(String indexName) {
Objects.requireNonNull(indexName);

StringBuilder builder = new StringBuilder(indexName.length());
for (char ch : indexName.toCharArray()) {
if (INVALID_INDEX_NAME_CHARS.contains(ch)) {
builder.append(String.format("%%%02X", (int) ch));
} else {
builder.append(ch);
}
}
return builder.toString();
}

/*
* Sanitize index name to comply with OpenSearch index name restrictions.
*/
private String sanitizeIndexName(String indexName) {
Objects.requireNonNull(indexName);

String encoded = percentEncode(indexName);
return toLowercase(encoded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.apache.spark.sql.flint.datatype

import java.time.format.DateTimeFormatterBuilder

import org.json4s.{Formats, JField, JValue, NoTypeHints}
import org.json4s.JsonAST.{JNothing, JObject, JString}
import org.json4s.jackson.JsonMethods
Expand Down Expand Up @@ -78,8 +76,11 @@ object FlintDataType {
// object types
case JString("object") | JNothing => deserializeJValue(fieldProperties)

// binary types
case JString("binary") => BinaryType

// not supported
case _ => throw new IllegalStateException(s"unsupported data type")
case unknown => throw new IllegalStateException(s"unsupported data type: $unknown")
}
DataTypes.createStructField(fieldName, dataType, true, metadataBuilder.build())
}
Expand Down Expand Up @@ -112,13 +113,16 @@ object FlintDataType {
JsonMethods.compact(JsonMethods.render(jValue))
}

def serializeJValue(structType: StructType): JValue = {
JObject("properties" -> JObject(structType.fields.map(field => serializeField(field)).toList))
private def serializeJValue(structType: StructType): JValue = {
JObject(
"properties" -> JObject(
structType.fields
.map(field => JField(field.name, serializeField(field.dataType, field.metadata)))
.toList))
}

def serializeField(structField: StructField): JField = {
val metadata = structField.metadata
val dataType = structField.dataType match {
def serializeField(dataType: DataType, metadata: Metadata): JValue = {
dataType match {
// boolean
case BooleanType => JObject("type" -> JString("boolean"))

Expand Down Expand Up @@ -147,8 +151,14 @@ object FlintDataType {

// objects
case st: StructType => serializeJValue(st)
case _ => throw new IllegalStateException(s"unsupported data type")

// array
case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty)

// binary
case BinaryType => JObject("type" -> JString("binary"))

case unknown => throw new IllegalStateException(s"unsupported data type: ${unknown.sql}")
}
JField(structField.name, dataType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand Down Expand Up @@ -372,7 +372,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(tableName)
.table(quotedTableName(tableName))
.writeStream
.queryName(indexName)
.addSinkOptions(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ object FlintSparkIndex {
s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}"
}

/**
* Add backticks to table name to escape special character
*
* @param fullTableName
* source full table name
* @return
* quoted table name
*/
def quotedTableName(fullTableName: String): String = {
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")

val parts = fullTableName.split('.')
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
}

/**
* Populate environment variables to persist in Flint metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex(

override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
val colNames = indexedColumns.keys.toSeq
val job = df.getOrElse(spark.read.table(tableName))
val job = df.getOrElse(spark.read.table(quotedTableName(tableName)))

// Add optional filtering condition
filterCondition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object FlintSparkMaterializedView {
.sql(query)
.schema
.map { field =>
field.name -> field.dataType.typeName
field.name -> field.dataType.simpleString
}
.toMap
FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class FlintSparkSkippingIndex(
new Column(aggFunc.as(name))
}

df.getOrElse(spark.read.table(tableName))
df.getOrElse(spark.read.table(quotedTableName(tableName)))
.groupBy(input_file_name().as(FILE_PATH_COLUMN))
.agg(namedAggFuncs.head, namedAggFuncs.tail: _*)
.withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
| },
| "textField": {
| "type": "text"
| },
| "binaryField": {
| "type": "binary"
| }
| }
|}""".stripMargin
Expand All @@ -59,6 +62,7 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
StringType,
true,
new MetadataBuilder().putString("osType", "text").build()) ::
StructField("binaryField", BinaryType, true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
Expand Down Expand Up @@ -192,6 +196,40 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
FlintDataType.serialize(sparkStructType) shouldBe compactJson(flintDataType)
}

test("spark array type map to should map to array element type in OpenSearch") {
val flintDataType = """{
| "properties": {
| "varcharField": {
| "type": "keyword"
| },
| "charField": {
| "type": "keyword"
| }
| }
|}""".stripMargin
val sparkStructType =
StructType(
StructField("arrayIntField", ArrayType(IntegerType), true) ::
StructField(
"arrayObjectField",
StructType(StructField("booleanField", BooleanType, true) :: Nil),
true) :: Nil)
FlintDataType.serialize(sparkStructType) shouldBe compactJson(s"""{
| "properties": {
| "arrayIntField": {
| "type": "integer"
| },
| "arrayObjectField": {
| "properties": {
| "booleanField":{
| "type": "boolean"
| }
| }
| }
| }
|}""".stripMargin)
}

def compactJson(json: String): String = {
val data: JValue = JsonMethods.parse(json)
JsonMethods.compact(JsonMethods.render(data))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark.covering

import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
Expand All @@ -30,6 +31,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
}
}

test("can build index building job with unique ID column") {
val index =
new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string"))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name")
}

test("can build index on table name with special characters") {
val testTableSpecial = "spark_catalog.default.test/2023/10"
val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string"))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name")
}

test("should fail if no indexed column given") {
assertThrows[IllegalArgumentException] {
new FlintSparkCoveringIndex("ci", "default.test", Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
}

test("can build index on table name with special characters") {
val testTableSpecial = "spark_catalog.default.test/2023/10"
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("name" -> "string"))
when(indexCol.getAggregators).thenReturn(
Seq(CollectSet(col("name").expr).toAggregateExpression()))
val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
}

// Test index build for different column type
Seq(
(
Expand Down
Loading

0 comments on commit ea2fa4b

Please sign in to comment.