Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create index on external table #1379

Closed
Tracked by #3
dai-chen opened this issue Feb 23, 2023 · 3 comments
Closed
Tracked by #3

Create index on external table #1379

dai-chen opened this issue Feb 23, 2023 · 3 comments

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Feb 23, 2023

Is your feature request related to a problem?

Object store query capability is being introduced in Spark integration and OpenSearch JDBC connector: #1331. To get the answer efficiently, we need some auxiliary data structure such as index to accelerate this kind of federated query.

What solution would you like?

Will work on the following task for prototype, demo and development afterwards:

  1. Add Spark parser extension with CREATE INDEX
  2. Store index data in OpenSearch index
  3. Auto refresh by Spark streaming execution (integrate with table metadata refresh job in Create External Table with AUTO_REFRESH enabled #1363)

What alternatives have you considered?

Alternative solution and reference for this work:

  1. Apache Carbondata: https://carbondata.apache.org/index.html
  2. Microsoft Hyperspace: https://microsoft.github.io/hyperspace/

Do you have any additional context?

Please find more details in:

  1. [RFC] Automatic Workload-Driven Query Acceleration by OpenSearch opensearch-spark#128
  2. [RFC] OpenSearch and Apache Spark Integration opensearch-spark#4
@dai-chen dai-chen added the enhancement New feature or request label Feb 23, 2023
@dai-chen dai-chen self-assigned this Feb 23, 2023
@dai-chen dai-chen changed the title [FEATURE] Create index on external table Create index on external table Feb 23, 2023
@dai-chen
Copy link
Collaborator Author

@dai-chen
Copy link
Collaborator Author

dai-chen commented Mar 7, 2023

Proof of Concepts

Setup

  • Spark 3.1.2 (Scala 2.12)
  • Hyperspace: publish PoC branch above (based on master branch) to local Maven repo
  • Delta: publish PoC branch (based on 1.0.1, in Add CREATE EXTERNAL TABLE statement #1394) to local Maven repo
  • OpenSearch cluster running at localhost:9200

Test

In the test, we first create a skipping index t001_name_skipping_index based on current records (parquet file 1 and 2) in Maximus external table t001. Then we add new file (parquet file 3) to remote data location to verify index data can auto refresh with table metadata.

spark-integration (5) (3) (1)-Page-10

$ bin/spark-sql --packages io.delta:delta-core_2.12:1.0.1,com.microsoft.hyperspace:hyperspace-core-spark3.1_2.12:0.5.0-SNAPSHOT --conf "spark.sql.extensions=com.microsoft.hyperspace.HyperspaceSparkSessionExtension,io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain --conf "spark.hyperspace.index.sources.fileBasedBuilders=com.microsoft.hyperspace.index.sources.delta.DeltaLakeFileBasedSourceBuilder,com.microsoft.hyperspace.index.sources.default.DefaultFileBasedSourceBuilder"

opensearch-project/sql#1.Create a table only for adding external records
CREATE TABLE IF NOT EXISTS temp (name STRING, age INT) 
USING PARQUET
LOCATION "s3a://maximus-table/";

INSERT INTO default.temp VALUES ('a', 1), ('b', 2);

$ aws s3 ls maximus-table/
  0 _SUCCESS
598 part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
598 part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet

opensearch-project/sql#2.Create Maximus (Delta) table with auto refresh enabled
CREATE EXTERNAL TABLE IF NOT EXISTS default.t001
(name STRING, age INT)
USING DELTA
LOCATION "s3a://maximus-table"
TBLPROPERTIES ('auto_refresh'='true');

#Output
#CreateDeltaTableCommand: === Refresh with files ===
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
#CreateDeltaTableCommand: === Refreshing index ===

opensearch-project/sql#3.Create skipping index on name column
CREATE INDEX t001_name_skipping_index ON default.t001 (name) AS 'bloomfilter';

$ curl "localhost:9200/t001_name_skipping_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "u6hHvoYBuDNJL8xoDiMO",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 0,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "1161928704398458880,36310272130023424",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vKhHvoYBuDNJL8xoDiMx",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 1,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "144116296177483776,1152930300699870208",
            "numHashFunctions" : 7
          }
        }
      }
    ]
  }
}

opensearch-project/sql#4.Add new records which trigger table metadata and index data auto refresh
INSERT INTO default.temp VALUES ('g', 7);

$ aws s3 ls maximus-table/
  0 _SUCCESS
598 part-00000-66a5ad0a-85c3-4641-85e4-85f67fe7b082-c000.snappy.parquet
598 part-00000-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet
598 part-00001-a96257e2-864c-4b0a-9a9b-8874f0ebd9bb-c000.snappy.parquet

#Output
#CreateDeltaTableCommand: === Refresh with files ===
#CreateDeltaTableCommand: New file: s3a://maximus-table/part-00000-66a5ad0a-85c3-4641-85e4-85f67fe7b082-c000.snappy.parquet
#CreateDeltaTableCommand: === Refreshing index ===
#CreateDeltaTableCommand: Index: t001_name_skipping_index

$ curl "localhost:9200/t001_name_skipping_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "u6hHvoYBuDNJL8xoDiMO",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 0,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "1161928704398458880,36310272130023424",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vKhHvoYBuDNJL8xoDiMx",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 1,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "144116296177483776,1152930300699870208",
            "numHashFunctions" : 7
          }
        }
      },
      {
        "_index" : "t001_name_skipping_index",
        "_id" : "vahJvoYBuDNJL8xo-yNi",
        "_score" : 1.0,
        "_source" : {
          "_data_file_id" : 2,
          "BloomFilter_name__10__0" : {
            "bitCount" : 7,
            "data" : "-9223372036854708736,202375168",
            "numHashFunctions" : 7
          }
        }
      }
    ]
  }
}

5.Use index to skip data
EXPLAIN SELECT * FROM t001 WHERE name = 'g';
== Physical Plan ==
*(1) Filter (isnotnull(name#3590) AND (name#3590 = g))
+- *(1) ColumnarToRow
   +- FileScan Hyperspace(Type: DS, Name: t001_name_skipping_index, LogVersion: 3) default.t001[name#3590,age#3591] Batched: true, DataFilters: [isnotnull(name#3590), (name#3590 = g)], Format: Parquet, Location: DataSkippingFileIndex[s3a://maximus-table], PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,g)], ReadSchema: struct<name:string,age:int>

@dai-chen dai-chen closed this as completed Mar 7, 2023
@dai-chen
Copy link
Collaborator Author

dai-chen commented Mar 7, 2023

Here is more test for covering index:

#Precreate OpenSearch index to avoid field type infer problem
$ curl -XPUT localhost:9200/t001_name_age_index -H "Content-Type: application/json" -d'
{
  "mappings" : {
    "properties" : {
      "age" : {
        "type" : "integer"
      },
      "name" : {
        "type" : "text",
        "fields" : {
          "keyword" : {
            "type" : "keyword",
            "ignore_above" : 256
          }
        }
      }
    }
  }
}'

#Create covering index
CREATE INDEX t001_name_age_index ON default.t001 (name, age) AS 'lucene';

SHOW INDEXES ON t001;
t001_name_age_index	["name","age"]	t001_name_age_index/v__=0	ACTIVE	{"includedColumns":"","numBuckets":"200","schema":"{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}}]}"}
t001_name_skipping_index	["name"]	t001_name_skipping_index/v__=0	ACTIVE	{"sketches":"BloomFilter(name, 10)"}

$ curl "localhost:9200/t001_name_age_index/_search?pretty"
{
    ...
    "hits" : [
      {
        "_index" : "t001_name_age_index",
        "_id" : "8mBxvoYBxyMZlj4aE4pm",
        "_score" : 1.0,
        "_source" : {
          "name" : "b",
          "age" : 2
        }
      },
      {
        "_index" : "t001_name_age_index",
        "_id" : "8WBxvoYBxyMZlj4aE4pI",
        "_score" : 1.0,
        "_source" : {
          "name" : "g",
          "age" : 7
        }
      },
      {
        "_index" : "t001_name_age_index",
        "_id" : "82BxvoYBxyMZlj4aE4pr",
        "_score" : 1.0,
        "_source" : {
          "name" : "a",
          "age" : 1
        }
      }
    ]
  }
}

EXPLAIN SELECT * FROM t001 WHERE name = 'g';
== Physical Plan ==
*(1) Filter (isnotnull(name#1204) AND (name#1204 = g))
+- *(1) Scan OpenSearchRelation(Map(opensearch.resource -> t001_name_age_index),org.apache.spark.sql.SQLContext@40574edf,None) default.t001[name#1204,age#1205] PushedFilters: [IsNotNull(name), EqualTo(name,g)], ReadSchema: struct<name:string,age:int>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant