Skip to content

Commit

Permalink
update sql USING query test
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 19, 2023
1 parent 42530a8 commit bd9a33b
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand All @@ -39,6 +41,7 @@
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -85,19 +88,18 @@ public void alias(String indexName, String aliasName, FlintMetadata metadata) {
if (!exists) {
// create index including the alias name with is the flint convention name
createIndex(osIndexName, metadata);
} else {
// Adding the alias to the existing index
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(osIndexName)
.alias(osAliasName);
aliasesRequest.addAliasAction(aliasAction);
// Executing the updateAliases request
AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName));
}
}
// Adding the alias to the existing index
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
IndicesAliasesRequest.AliasActions aliasAction =
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD)
.index(osIndexName)
.alias(osAliasName);
aliasesRequest.addAliasAction(aliasAction);
// Executing the updateAliases request
AcknowledgedResponse response = client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
throw new IllegalStateException(String.format("Failed to acknowledge Alias %s for index %s", aliasName, indexName));
}
} catch (
Exception e) {
Expand All @@ -110,17 +112,16 @@ public void createIndex(String indexName, FlintMetadata metadata) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
boolean includeMappingProperties = true;
if(metadata.targetName().nonEmpty()) {
request.alias(new Alias(toLowercase(metadata.name())));
includeMappingProperties = false;
}
boolean includeMappingProperties = !metadata.targetName().nonEmpty();
request.mapping(metadata.getContent(includeMappingProperties), XContentType.JSON);
Option<String> settings = metadata.indexSettings();
if (settings.isDefined()) {
request.settings(settings.get(), XContentType.JSON);
}
client.indices().create(request, RequestOptions.DEFAULT);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
throw new IllegalStateException(String.format("Failed to acknowledge create index %s", indexName));
}
} catch (Exception e) {
throw new IllegalStateException("Failed to create Flint index " + osIndexName, e);
}
Expand Down Expand Up @@ -159,10 +160,24 @@ public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
if (response.getMappings().containsKey(osIndexName)) {
MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return FlintMetadata.apply(mapping.source().string(), settings.toString());
}
if (!response.getAliases().isEmpty()) {
Optional<String> aliasAncestor = response.getAliases().entrySet().stream()
.filter(entry -> entry.getValue().stream().anyMatch(alias -> alias.alias().equals(indexName)))
.map(Map.Entry::getKey)
.findFirst();

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return FlintMetadata.apply(mapping.source().string(), settings.toString());
if(aliasAncestor.isPresent()) {
MappingMetadata mapping = response.getMappings().get(aliasAncestor.get());
Settings settings = response.getSettings().get(aliasAncestor.get());
return FlintMetadata.apply(mapping.source().string(), settings.toString());
}
}
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName);
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
Expand All @@ -173,8 +188,10 @@ public void deleteIndex(String indexName) {
String osIndexName = toLowercase(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

client.indices().delete(request, RequestOptions.DEFAULT);
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
if (!response.isAcknowledged()) {
throw new IllegalStateException(String.format("Failed to acknowledge delete index %s", indexName));
}
} catch (Exception e) {
throw new IllegalStateException("Failed to delete Flint index " + osIndexName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
/** Test table and index name */
private val testTable = "spark_catalog.default.covering_sql_test"
private val testIndex = "name_and_age"
private val targetIndex = "target_index"
private val testFlintIndex = getFlintIndexName(testIndex, testTable)

override def beforeAll(): Unit = {
super.beforeAll()

createPartitionedTable(testTable)
}

override def afterEach(): Unit = {
super.afterEach()

Expand Down Expand Up @@ -93,7 +94,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
(settings \ "index.number_of_shards").extract[String] shouldBe "2"
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}

test("create covering index with invalid option") {
the[IllegalArgumentException] thrownBy
sql(s"""
Expand Down Expand Up @@ -232,4 +233,27 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

flint.describeIndex(testFlintIndex) shouldBe empty
}

test("use existing index as the covering index") {
sql(
s"""
| CREATE INDEX $testIndex ON $testTable USING $targetIndex ( name )
| WITH (
| index_settings = '{"number_of_shards": 2, "number_of_replicas": 3}'
| )
|""".stripMargin)

// Check if the index setting option is set to OS index setting
val flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava))

implicit val formats: Formats = Serialization.formats(NoTypeHints)
var settings = parse(flintClient.getIndexMetadata(targetIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "2"
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
//validate the index alias is working
settings = parse(flintClient.getIndexMetadata(testFlintIndex).indexSettings.get)
(settings \ "index.number_of_shards").extract[String] shouldBe "2"
(settings \ "index.number_of_replicas").extract[String] shouldBe "3"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,54 +88,6 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
|""".stripMargin)
}

test("create materialized view using existing OpenSearch index successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
flint
.materializedView()
.targetName(testTargetIndex)
.name(testMvName)
.query(testQuery)
.options(indexOptions)
.create()

val index = flint.describeIndex("existing_index")
index shouldBe defined
index.get.metadata().getContent() should matchJson(s"""
| {
| "_meta": {
| "version": "${current()}",
| "name": "spark_catalog.default.mv_test_metrics",
| "kind": "mv",
| "targetName": "$testTargetIndex",
| "source": "$testQuery",
| "indexedColumns": [
| {
| "columnName": "startTime",
| "columnType": "timestamp"
| },{
| "columnName": "count",
| "columnType": "long"
| }],
| "options": {
| "auto_refresh": "true",
| "checkpoint_location": "s3://test/"
| },
| "properties": {}
| },
| "properties": {
| "startTime": {
| "type": "date",
| "format": "strict_date_optional_time_nanos"
| },
| "count": {
| "type": "long"
| }
| }
| }
|""".stripMargin)
}

// TODO: fix this windowing function unable to be used in GROUP BY
ignore("full refresh materialized view") {
flint
Expand Down Expand Up @@ -233,6 +185,55 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
}
}

test("use existing existing OpenSearch index for materialized view successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
flint
.materializedView()
.targetName(testTargetIndex)
.name(testMvName)
.query(testQuery)
.options(indexOptions)
.create()

val index = flint.describeIndex("existing_index")
index shouldBe defined
index.get.metadata().getContent() should matchJson(
s"""
| {
| "_meta": {
| "version": "${current()}",
| "name": "spark_catalog.default.mv_test_metrics",
| "kind": "mv",
| "targetName": "$testTargetIndex",
| "source": "$testQuery",
| "indexedColumns": [
| {
| "columnName": "startTime",
| "columnType": "timestamp"
| },{
| "columnName": "count",
| "columnType": "long"
| }],
| "options": {
| "auto_refresh": "true",
| "checkpoint_location": "s3://test/"
| },
| "properties": {}
| },
| "properties": {
| "startTime": {
| "type": "date",
| "format": "strict_date_optional_time_nanos"
| },
| "count": {
| "type": "long"
| }
| }
| }
|""".stripMargin)
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)

private def withIncrementalMaterializedView(query: String)(
Expand Down

0 comments on commit bd9a33b

Please sign in to comment.