Skip to content

Commit

Permalink
Merge branch 'main' into ppl-spark-translation
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Sep 26, 2023
2 parents 119fd5e + eadb614 commit 34de246
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@

package org.opensearch.flint.core;

import java.util.List;
import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;

import java.io.Writer;

/**
* Flint index client that provides API for metadata and data operations
* on a Flint index regardless of concrete storage.
Expand All @@ -33,6 +32,14 @@ public interface FlintClient {
*/
boolean exists(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return all matched index metadata
*/
List<FlintMetadata> getAllIndexMetadata(String indexNamePattern);

/**
* Retrieve metadata in a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@

package org.opensearch.flint.core.storage;

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
Expand All @@ -34,13 +42,6 @@
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;

/**
* Flint client implementation for OpenSearch storage.
*/
Expand Down Expand Up @@ -79,6 +80,19 @@ public FlintOpenSearchClient(FlintOptions options) {
}
}

@Override public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexNamePattern);
GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);

return response.mappings().values().stream()
.map(mapping -> new FlintMetadata(mapping.source().string()))
.collect(Collectors.toList());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + indexNamePattern, e);
}
}

@Override public FlintMetadata getIndexMetadata(String indexName) {
try (RestHighLevelClient client = createClient()) {
GetMappingsRequest request = new GetMappingsRequest().indices(indexName);
Expand Down
10 changes: 10 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dropSkippingIndexStatement
coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| dropCoveringIndexStatement
;

Expand All @@ -60,6 +62,14 @@ refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

showCoveringIndexStatement
: SHOW (INDEX | INDEXES) ON tableName=multipartIdentifier
;

describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;
Expand Down
2 changes: 2 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
FALSE: 'FALSE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ class FlintSpark(val spark: SparkSession) {
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
*
* @param indexNamePattern
* index name pattern which may contains wildcard
* @return
* Flint index list
*/
def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = {
flintClient.getAllIndexMetadata(indexNamePattern).asScala.map(deserialize)
}

/**
* Describe a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* @param indexedColumns
* indexed column list
*/
class FlintSparkCoveringIndex(
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
* Flint Spark AST builder that builds Spark command for Flint covering index statement.
Expand Down Expand Up @@ -55,6 +58,41 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
}
}

override def visitShowCoveringIndexStatement(
ctx: ShowCoveringIndexStatementContext): Command = {
val outputSchema = Seq(AttributeReference("index_name", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val fullTableName = getFullTableName(flint, ctx.tableName)
val indexNamePattern = FlintSparkCoveringIndex.getFlintIndexName("*", fullTableName)
flint
.describeIndexes(indexNamePattern)
.collect { case index: FlintSparkCoveringIndex =>
Row(index.indexName)
}
}
}

override def visitDescribeCoveringIndexStatement(
ctx: DescribeCoveringIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("indexed_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("index_type", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val indexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint
.describeIndex(indexName)
.map { case index: FlintSparkCoveringIndex =>
index.indexedColumns.map { case (colName, colType) =>
Row(colName, colType, "indexed")
}.toSeq
}
.getOrElse(Seq.empty)
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
flintClient.getIndexMetadata(indexName).getContent should matchJson(content)
}

it should "get all index metadata with the given index name pattern" in {
flintClient.createIndex("flint_test_1_index", new FlintMetadata("{}"))
flintClient.createIndex("flint_test_2_index", new FlintMetadata("{}"))

flintClient.getAllIndexMetadata("flint_*_index") should have size 2
}

it should "return false if index not exist" in {
flintClient.exists("non-exist-index") shouldBe false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ package org.opensearch.flint.spark
import scala.Option.empty

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.sql.Row

class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

/** Test table and index name */
Expand Down Expand Up @@ -64,6 +67,48 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("show all covering index on the source table") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

// Create another covering index
flint
.coveringIndex()
.name("idx_address")
.onTable(testTable)
.addIndexColumns("address")
.create()

// Create a skipping index which is expected to be filtered
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.create()

val result = sql(s"SHOW INDEX ON $testTable")
checkAnswer(result, Seq(Row(testIndex), Row("idx_address")))

flint.deleteIndex(getFlintIndexName("idx_address", testTable))
flint.deleteIndex(getSkippingIndexName(testTable))
}

test("describe covering index") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

val result = sql(s"DESC INDEX $testIndex ON $testTable")
checkAnswer(result, Seq(Row("name", "string", "indexed"), Row("age", "int", "indexed")))
}

test("drop covering index") {
flint
.coveringIndex()
Expand Down

0 comments on commit 34de246

Please sign in to comment.