Skip to content

Commit

Permalink
Merge branch 'main' into remove-refresh-mode
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 18, 2024
2 parents 2d7b174 + 6e163e7 commit 59779b2
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 13 deletions.
1 change: 0 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ jobs:
with:
github_token: ${{ steps.github_app_token.outputs.token }}
head_template: backport/backport-<%= number %>-to-<%= base %>
files_to_skip: 'CHANGELOG.md'
1 change: 1 addition & 0 deletions .github/workflows/snapshot-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- 0.*

jobs:
build-and-publish-snapshots:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient {
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());

/**
* Invalid index name characters to percent-encode,
* excluding '*' because it's reserved for pattern matching.
*/
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

/**
* Metadata log index name prefix
*/
Expand Down Expand Up @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {

protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
Expand All @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
@Override
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
Expand All @@ -148,7 +156,7 @@ public boolean exists(String indexName) {
@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = toLowercase(indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -166,7 +174,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
@Override
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

Expand Down Expand Up @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
toLowercase(indexName),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
} catch (IOException e) {
Expand All @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) {

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
}

@Override
Expand Down Expand Up @@ -287,4 +295,31 @@ private String toLowercase(String indexName) {

return indexName.toLowerCase(Locale.ROOT);
}

/*
* Percent-encode invalid OpenSearch index name characters.
*/
private String percentEncode(String indexName) {
Objects.requireNonNull(indexName);

StringBuilder builder = new StringBuilder(indexName.length());
for (char ch : indexName.toCharArray()) {
if (INVALID_INDEX_NAME_CHARS.contains(ch)) {
builder.append(String.format("%%%02X", (int) ch));
} else {
builder.append(ch);
}
}
return builder.toString();
}

/*
* Sanitize index name to comply with OpenSearch index name restrictions.
*/
private String sanitizeIndexName(String indexName) {
Objects.requireNonNull(indexName);

String encoded = percentEncode(indexName);
return toLowercase(encoded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand Down Expand Up @@ -373,7 +373,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(tableName)
.table(quotedTableName(tableName))
.writeStream
.queryName(indexName)
.addSinkOptions(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ object FlintSparkIndex {
s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}"
}

/**
* Add backticks to table name to escape special character
*
* @param fullTableName
* source full table name
* @return
* quoted table name
*/
def quotedTableName(fullTableName: String): String = {
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")

val parts = fullTableName.split('.')
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
}

/**
* Populate environment variables to persist in Flint metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex(

override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
val colNames = indexedColumns.keys.toSeq
val job = df.getOrElse(spark.read.table(tableName))
val job = df.getOrElse(spark.read.table(quotedTableName(tableName)))

// Add optional filtering condition
filterCondition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class FlintSparkSkippingIndex(
new Column(aggFunc.as(name))
}

df.getOrElse(spark.read.table(tableName))
df.getOrElse(spark.read.table(quotedTableName(tableName)))
.groupBy(input_file_name().as(FILE_PATH_COLUMN))
.agg(namedAggFuncs.head, namedAggFuncs.tail: _*)
.withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.spark.covering

import org.scalatest.matchers.must.Matchers.contain
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
Expand All @@ -30,6 +31,24 @@ class FlintSparkCoveringIndexSuite extends FlintSuite {
}
}

test("can build index building job with unique ID column") {
val index =
new FlintSparkCoveringIndex("ci", "spark_catalog.default.test", Map("name" -> "string"))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name")
}

test("can build index on table name with special characters") {
val testTableSpecial = "spark_catalog.default.test/2023/10"
val index = new FlintSparkCoveringIndex("ci", testTableSpecial, Map("name" -> "string"))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name")
}

test("should fail if no indexed column given") {
assertThrows[IllegalArgumentException] {
new FlintSparkCoveringIndex("ci", "default.test", Map.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ class FlintSparkSkippingIndexSuite extends FlintSuite {
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
}

test("can build index on table name with special characters") {
val testTableSpecial = "spark_catalog.default.test/2023/10"
val indexCol = mock[FlintSparkSkippingStrategy]
when(indexCol.outputSchema()).thenReturn(Map("name" -> "string"))
when(indexCol.getAggregators).thenReturn(
Seq(CollectSet(col("name").expr).toAggregateExpression()))
val index = new FlintSparkSkippingIndex(testTableSpecial, Seq(indexCol))

val df = spark.createDataFrame(Seq(("hello", 20))).toDF("name", "age")
val indexDf = index.build(spark, Some(df))
indexDf.schema.fieldNames should contain only ("name", FILE_PATH_COLUMN, ID_COLUMN)
}

// Test index build for different column type
Seq(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,30 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
flintClient.exists(indexName) shouldBe false
}

it should "percent-encode invalid index name characters" in {
val indexName = "test ,:\"+/\\|?#><"
flintClient.createIndex(
indexName,
FlintMetadata("""{"properties": {"test": { "type": "integer" } } }"""))

flintClient.exists(indexName) shouldBe true
flintClient.getIndexMetadata(indexName) should not be null
flintClient.getAllIndexMetadata("test *") should not be empty

// Read write test
val writer = flintClient.createWriter(indexName)
writer.write("""{"create":{}}""")
writer.write("\n")
writer.write("""{"test":1}""")
writer.write("\n")
writer.flush()
writer.close()
flintClient.createReader(indexName, "").hasNext shouldBe true

flintClient.deleteIndex(indexName)
flintClient.exists(indexName) shouldBe false
}

it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
Expand Down

0 comments on commit 59779b2

Please sign in to comment.