Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement show flint index statement #276

Merged
merged 13 commits into from
Mar 15, 2024
20 changes: 20 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,26 @@ DROP MATERIALIZED VIEW alb_logs_metrics
VACUUM MATERIALIZED VIEW alb_logs_metrics
```

#### All Indexes

- **Show Flint Indexes**: Displays all the flint indexes with their info. It outputs the following columns:
- flint_index_name: the full OpenSearch index name
- kind: type of the index (skipping / covering / mv)
- database: database name for the index
- table: table name for skipping and covering index
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
```

Example:
```sql
SHOW FLINT INDEXES IN spark_catalog.default
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
package org.opensearch.flint.core;

import java.util.List;
import java.util.Optional;

import org.opensearch.flint.core.metadata.FlintMetadata;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
seankao-az marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;
import org.opensearch.flint.core.storage.FlintReader;
import org.opensearch.flint.core.storage.FlintWriter;
Expand Down Expand Up @@ -71,6 +73,15 @@ <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourc
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Retrieve metadata latest log entry for a Flint index.
*
* @param indexName index name
* @param dataSourceName TODO: read from elsewhere in the future
* @return index metadata latest log entry
*/
Optional<FlintMetadataLogEntry> getIndexMetadataLatestLogEntry(String indexName, String dataSourceName);

seankao-az marked this conversation as resolved.
Show resolved Hide resolved
/**
* Delete a Flint index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
Expand Down Expand Up @@ -93,29 +94,11 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit) {
public <T> OptimisticTransaction<T> startTransaction(
String indexName, String dataSourceName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (forceInit) {
createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(),
Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS()));
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
return new DefaultOptimisticTransaction<>(dataSourceName,
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName));
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
return new DefaultOptimisticTransaction<>(dataSourceName,
getMetadataLog(indexName, dataSourceName, forceInit));
}

@Override
Expand Down Expand Up @@ -189,6 +172,13 @@ public FlintMetadata getIndexMetadata(String indexName) {
}
}

@Override
public Optional<FlintMetadataLogEntry> getIndexMetadataLatestLogEntry(String indexName, String dataSourceName) {
LOG.info("Fetching latest metadata log entry for " + indexName + " and data source " + dataSourceName);
FlintOpenSearchMetadataLog metadataLog = getMetadataLog(indexName, dataSourceName, false);
return metadataLog.getLatest();
}

@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
Expand Down Expand Up @@ -287,6 +277,29 @@ public IRestHighLevelClient createClient() {
return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

private FlintOpenSearchMetadataLog getMetadataLog(
String indexName, String dataSourceName, boolean forceInit) {
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
} else {
if (forceInit) {
createIndex(metaLogIndexName, FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_MAPPING(),
Some.apply(FlintMetadataLogEntry.QUERY_EXECUTION_REQUEST_SETTINGS()));
} else {
String errorMsg = "Metadata log index not found " + metaLogIndexName;
LOG.warning(errorMsg);
throw new IllegalStateException(errorMsg);
}
}
return new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName);
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.JavaConverters._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
Expand Down Expand Up @@ -202,6 +203,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

def fetchIndexMetadataLatestLogEntry(indexName: String): Option[FlintMetadataLogEntry] = {
logInfo(s"Fetching metadata log entry for index $indexName")
Option(flintClient.getIndexMetadataLatestLogEntry(indexName, dataSourceName).orElse(null))
}

/**
* Delete index and refreshing job associated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder
Expand All @@ -26,6 +27,7 @@ class FlintSparkSqlAstBuilder
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with FlintSparkIndexAstBuilder
with FlintSparkIndexJobAstBuilder
with SparkSqlAstBuilder {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql.index

import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.{BooleanType, StringType}

/**
* Flint Spark AST builder that builds Spark command for Flint index management statement.
*/
trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {
self: SparkSqlAstBuilder =>

override def visitShowFlintIndexStatement(ctx: ShowFlintIndexStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("flint_index_name", StringType, nullable = false)(),
AttributeReference("kind", StringType, nullable = false)(),
AttributeReference("database", StringType, nullable = false)(),
AttributeReference("table", StringType, nullable = true)(),
AttributeReference("index_name", StringType, nullable = true)(),
AttributeReference("auto_refresh", BooleanType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)())

FlintSparkSqlCommand(outputSchema) { flint =>
val catalogDbName =
ctx.catalogDb.parts
.map(part => part.getText)
.mkString("_")
val indexNamePattern = s"flint_${catalogDbName}_*"
flint
.describeIndexes(indexNamePattern)
.map { index =>
val parts = index match {
case mv: FlintSparkMaterializedView => mv.mvName.split('.')
case covering: FlintSparkCoveringIndex => covering.tableName.split('.')
case skipping: FlintSparkSkippingIndex => skipping.tableName.split('.')
}
val dataSourceName = parts(0)
val databaseName = parts(1)

val tableName = index match {
// MV doesn't belong to a table
case _: FlintSparkMaterializedView => null
// Table name must be qualified when metadata created
case _ => parts.drop(2).mkString(".")
}
val indexName = index match {
case covering: FlintSparkCoveringIndex => covering.indexName
// MV name must be qualified when metadata created
case _: FlintSparkMaterializedView => parts.drop(2).mkString(".")
// Skipping index doesn't have a user defined name
case _: FlintSparkSkippingIndex => null
}

val status = flint.fetchIndexMetadataLatestLogEntry(index.name) match {
case Some(entry) => entry.state.toString
case None => "unavailable"
}

Row(
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
index.name,
index.kind,
databaseName,
tableName,
indexName,
index.options.autoRefresh(),
status)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
the[IllegalStateException] thrownBy {
flintClient.startTransaction("test", "non-exist-index")
}
the[IllegalStateException] thrownBy {
flintClient.getIndexMetadataLatestLogEntry("test", "non-exist-index")
}
}

it should "create index successfully" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers {
.commit(_ => {})
}

test("should get metadata log entry") {
val testCreateTime = 1234567890123L
createLatestLogEntry(
FlintMetadataLogEntry(
id = testLatestId,
seqNo = UNASSIGNED_SEQ_NO,
primaryTerm = UNASSIGNED_PRIMARY_TERM,
createTime = testCreateTime,
state = ACTIVE,
dataSource = testDataSourceName,
error = ""))

val latest =
flintClient.getIndexMetadataLatestLogEntry(testFlintIndex, testDataSourceName).get
latest.id shouldBe testLatestId
latest.createTime shouldBe testCreateTime
latest.dataSource shouldBe testDataSourceName
latest.error shouldBe ""
}

test("should get empty if no metadata log entry for index") {
flintClient
.getIndexMetadataLatestLogEntry(testFlintIndex, testDataSourceName) shouldBe empty
}

test("should preserve original values when transition") {
val testCreateTime = 1234567890123L
createLatestLogEntry(
Expand Down
Loading
Loading