diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4
index f48c276e44..e44944fcff 100644
--- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4
+++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4
@@ -31,6 +31,7 @@ createSkippingIndexStatement
     : CREATE SKIPPING INDEX (IF NOT EXISTS)?
         ON tableName
         LEFT_PAREN indexColTypeList RIGHT_PAREN
+        whereClause?
         (WITH LEFT_PAREN propertyList RIGHT_PAREN)?
     ;
 
@@ -58,6 +59,7 @@ createCoveringIndexStatement
     : CREATE INDEX (IF NOT EXISTS)? indexName
         ON tableName
         LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
+        whereClause?
         (WITH LEFT_PAREN propertyList RIGHT_PAREN)?
     ;
 
@@ -115,6 +117,14 @@ materializedViewQuery
     : .+?
     ;
 
+whereClause
+    : WHERE filterCondition
+    ;
+
+filterCondition
+    : .+?
+    ;
+
 indexColTypeList
     : indexColType (COMMA indexColType)*
     ;
diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4
index 533d851ba6..597a1e5856 100644
--- a/spark/src/main/antlr/SparkSqlBase.g4
+++ b/spark/src/main/antlr/SparkSqlBase.g4
@@ -174,6 +174,7 @@ SHOW: 'SHOW';
 TRUE: 'TRUE';
 VIEW: 'VIEW';
 VIEWS: 'VIEWS';
+WHERE: 'WHERE';
 WITH: 'WITH';
 
 
diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java
index 5a9fe4d31f..fc1513241f 100644
--- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java
+++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java
@@ -5,6 +5,9 @@
 
 package org.opensearch.sql.spark.dispatcher.model;
 
+import static org.apache.commons.lang3.StringUtils.strip;
+import static org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails.STRIP_CHARS;
+
 import java.util.Arrays;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -40,4 +43,23 @@ public FullyQualifiedTableName(String fullyQualifiedName) {
       tableName = parts[0];
     }
   }
+
+  /**
+   * Convert qualified name to Flint name concat by underscore.
+   *
+   * @return Flint name
+   */
+  public String toFlintName() {
+    StringBuilder builder = new StringBuilder();
+    if (datasourceName != null) {
+      builder.append(strip(datasourceName, STRIP_CHARS)).append("_");
+    }
+    if (schemaName != null) {
+      builder.append(strip(schemaName, STRIP_CHARS)).append("_");
+    }
+    if (tableName != null) {
+      builder.append(strip(tableName, STRIP_CHARS));
+    }
+    return builder.toString();
+  }
 }
diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java
index 5b4326a10e..576b0772d2 100644
--- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java
+++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java
@@ -5,6 +5,8 @@
 
 package org.opensearch.sql.spark.dispatcher.model;
 
+import static org.apache.commons.lang3.StringUtils.strip;
+
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
@@ -83,32 +85,19 @@ public String openSearchIndexName() {
     switch (getIndexType()) {
       case COVERING:
         indexName =
-            "flint"
-                + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
-                + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+            "flint_"
+                + fullyQualifiedTableName.toFlintName()
                 + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
-                + "_"
-                + StringUtils.strip(getIndexName(), STRIP_CHARS)
+                + strip(getIndexName(), STRIP_CHARS)
                 + "_"
                 + getIndexType().getSuffix();
         break;
       case SKIPPING:
         indexName =
-            "flint"
-                + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
-                + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
-                + "_"
-                + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
-                + "_"
-                + getIndexType().getSuffix();
+            "flint_" + fullyQualifiedTableName.toFlintName() + "_" + getIndexType().getSuffix();
         break;
       case MATERIALIZED_VIEW:
-        indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase();
+        indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName();
         break;
     }
     return indexName.toLowerCase();
diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java
index e725ddc21e..6299dee0ca 100644
--- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java
+++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java
@@ -26,4 +26,81 @@ public void skippingIndexName() {
             .build()
             .openSearchIndexName());
   }
+
+  @Test
+  public void coveringIndexName() {
+    assertEquals(
+        "flint_mys3_default_http_logs_idx_status_index",
+        IndexQueryDetails.builder()
+            .indexName("idx_status")
+            .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs"))
+            .indexType(FlintIndexType.COVERING)
+            .build()
+            .openSearchIndexName());
+  }
+
+  @Test
+  public void materializedViewIndexName() {
+    assertEquals(
+        "flint_mys3_default_http_logs_metrics",
+        IndexQueryDetails.builder()
+            .mvName("mys3.default.http_logs_metrics")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+  }
+
+  @Test
+  public void materializedViewIndexNameWithBackticks() {
+    assertEquals(
+        "flint_mys3_default_http_logs_metrics",
+        IndexQueryDetails.builder()
+            .mvName("`mys3`.`default`.`http_logs_metrics`")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+  }
+
+  @Test
+  public void materializedViewIndexNameWithDots() {
+    assertEquals(
+        "flint_mys3_default_http_logs_metrics.1026",
+        IndexQueryDetails.builder()
+            .mvName("`mys3`.`default`.`http_logs_metrics.1026`")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+  }
+
+  @Test
+  public void materializedViewIndexNameWithDotsInCatalogName() {
+    // FIXME: should not use ctx.getText which is hard to split
+    assertEquals(
+        "flint_mys3_1026_default`.`http_logs_metrics",
+        IndexQueryDetails.builder()
+            .mvName("`mys3.1026`.`default`.`http_logs_metrics`")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+  }
+
+  @Test
+  public void materializedViewIndexNameNotFullyQualified() {
+    // Normally this should not happen and can add precondition check once confirmed.
+    assertEquals(
+        "flint_default_http_logs_metrics",
+        IndexQueryDetails.builder()
+            .mvName("default.http_logs_metrics")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+
+    assertEquals(
+        "flint_http_logs_metrics",
+        IndexQueryDetails.builder()
+            .mvName("http_logs_metrics")
+            .indexType(FlintIndexType.MATERIALIZED_VIEW)
+            .build()
+            .openSearchIndexName());
+  }
 }
diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java
index c86d7656d6..f5226206ab 100644
--- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java
+++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java
@@ -102,19 +102,57 @@ void testErrorScenarios() {
   }
 
   @Test
-  void testExtractionFromFlintIndexQueries() {
-    String createCoveredIndexQuery =
-        "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) WITH"
-            + " (auto_refresh = true)";
-    Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(createCoveredIndexQuery));
-    IndexQueryDetails indexQueryDetails =
-        SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery);
-    FullyQualifiedTableName fullyQualifiedTableName =
-        indexQueryDetails.getFullyQualifiedTableName();
-    Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName());
-    Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
-    Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
-    Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
+  void testExtractionFromFlintSkippingIndexQueries() {
+    String[] createSkippingIndexQueries = {
+      "CREATE SKIPPING INDEX ON myS3.default.alb_logs (l_orderkey VALUE_SET)",
+      "CREATE SKIPPING INDEX IF NOT EXISTS"
+          + " ON myS3.default.alb_logs (l_orderkey VALUE_SET) "
+          + " WITH (auto_refresh = true)",
+      "CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET)"
+          + " WITH (auto_refresh = true)",
+      "CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET) "
+          + " WHERE elb_status_code = 500 "
+          + " WITH (auto_refresh = true)"
+    };
+
+    for (String query : createSkippingIndexQueries) {
+      Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query);
+      IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query);
+      FullyQualifiedTableName fullyQualifiedTableName =
+          indexQueryDetails.getFullyQualifiedTableName();
+
+      Assertions.assertNull(indexQueryDetails.getIndexName());
+      Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
+      Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
+      Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
+    }
+  }
+
+  @Test
+  void testExtractionFromFlintCoveringIndexQueries() {
+    String[] createCoveredIndexQueries = {
+      "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)",
+      "CREATE INDEX IF NOT EXISTS elb_and_requestUri "
+          + " ON myS3.default.alb_logs(l_orderkey, l_quantity) "
+          + " WITH (auto_refresh = true)",
+      "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)"
+          + " WITH (auto_refresh = true)",
+      "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) "
+          + " WHERE elb_status_code = 500 "
+          + " WITH (auto_refresh = true)"
+    };
+
+    for (String query : createCoveredIndexQueries) {
+      Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query);
+      IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query);
+      FullyQualifiedTableName fullyQualifiedTableName =
+          indexQueryDetails.getFullyQualifiedTableName();
+
+      Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName());
+      Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
+      Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
+      Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
+    }
   }
 
   @Test