Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Improve validation for SQL statement #65

Closed
Tracked by #3
dai-chen opened this issue Oct 6, 2023 · 10 comments
Closed
Tracked by #3

[FEATURE] Improve validation for SQL statement #65

dai-chen opened this issue Oct 6, 2023 · 10 comments
Assignees
Labels
0.4 enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Oct 6, 2023

Is your feature request related to a problem?

Improve validation for SQL create statement:

  1. For DDL statement (create):
    a. Validate WITH options and report error if invalid given
    b. Check if given column is not supported by skipping/covering index, report error early instead of reporting when submitting DataFrame job at background
  2. For DML statement (show/desc/refresh), report error if given table name invalid

What solution would you like?

  • For 1a) WITH options, Add validate logic in FlintSparkIndexOptions
  • For 1b) streaming job, need to figure out how to validate it early
  • For 2, add IT to verify if this is the current behavior
@dai-chen
Copy link
Collaborator Author

Other validation example:

scala> (flint
     |   .materializedView()
     |   .name("myglue.default.lineitem_metrics")
     |   .query("SELECT window.start, COUNT(*) FROM stream.lineitem_tiny GROUP BY TUMBLE(l_shipdate, '1 Month')")
     |   .options(FlintSparkIndexOptions(Map(
     |     "auto_refresh" -> "true",
     |     "checkpoint_location" -> "s3://test/checkpoints/"
     |   )))
     |   .create())
java.lang.IllegalArgumentException: Intervals greater than a month is not supported (1 Month).

# Select alias is required otherwise StructType.fromDDL() may fail if any parentheses in column name
scala> (flint
     |   .materializedView()
     |   .name("myglue.default.lineitem_metrics")
     |   .query("SELECT window.start, COUNT(*) FROM stream.lineitem_tiny GROUP BY TUMBLE(l_shipdate, '1 Week')")
     |   .options(FlintSparkIndexOptions(Map(
     |     "auto_refresh" -> "true",
     |     "checkpoint_location" -> "s3://test/checkpoints/"
     |   )))
     |   .create())
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near '('(line 1, pos 30)

== SQL ==
start timestamp not null,count(1) long not null

@dai-chen
Copy link
Collaborator Author

Another validation required. Because whereClause and mvQuery rule can match anything (non-greedily). If anything wrong in WITH clause after it, the query will still be accepted and reply on Spark to validate.

spark-sql> CREATE INDEX test ON ds_tables.http_logs
         > (clientip, status)
         > WHERE status != 200
         > WITH (
         >   auto_refresh = true
         > ;
Time taken: 2.511 seconds

The expression WHERE status != 200 WITH (auto_refresh = true above is passed to Spark as filtering expression. Spark doesn't throw any exception.

@dai-chen
Copy link
Collaborator Author

dai-chen commented Nov 29, 2023

Spark structured streaming doesn't support Hive table. Here is the test that identifies a table is Hive or not:

$ spark-shell  ... --conf spark.flint.datasource.name=myglue

scala> import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}

scala> def getTableProperties(qualifiedTableName: String): java.util.Map[String, String] = {
     |   val (catalog, ident) = parseTableName(spark, qualifiedTableName)
     |   val table = loadTable(catalog, ident)
     |   table.get.properties
     | }

scala> getTableProperties("myglue.stream.lineitem_tiny")
res11: java.util.Map[String,String] = {location=s3://.../tpch-lineitem-tiny,
 provider=JSON, external=true, option.compression=gzip, owner=hadoop}

scala> getTableProperties("myglue.ds_tables.http_logs")
res12: java.util.Map[String,String] = {location=s3://.../http_logs_partitioned_json_bz2,
 provider=json, external=true, option.compression=bzip2, owner=hadoop}

scala> getTableProperties("myglue.mydatabase.noaa_ghcn_pds")
res14: java.util.Map[String,String] = {location=s3://noaa-ghcn-pds/csv,
 provider=hive, transient_lastDdlTime=1675459327, option.serialization.format=1,
 external=true, classification=csv, owner=hadoop, option.separatorChar=,}

@penghuo
Copy link
Collaborator

penghuo commented Jan 24, 2024

Reproduce Issue

  • create table
-- Create the Hive table
CREATE TABLE IF NOT EXISTS user_data (
    name STRING,
    age INT
);

-- Insert data into the table
INSERT INTO user_data VALUES ('Alice', 30), ('Bob', 25);

CREATE SKIPPING INDEX ON user_data (age VALUE_SET)  WITH (auto_refresh = true)

  • Error log
org.apache.spark.SparkException: Execution of the stream flint_spark_catalog_default_user_data_skipping_index failed. Please, fill a bug report in, and provide the full stack trace.
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:324)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.hive.HiveShim$.wrapperToFileSinkDesc(HiveShim.scala:228)
	at org.apache.spark.sql.hive.execution.HiveFileFormat.supportFieldName(HiveFileFormat.scala:112)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$checkFieldNames$1(DataSourceUtils.scala:75)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$checkFieldNames$1$adapted(DataSourceUtils.scala:74)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.checkFieldNames(DataSourceUtils.scala:74)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:95)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:437)
	at org.apache.spark.sql.execution.streaming.FileStreamSource.getBatch(FileStreamSource.scala:248)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:549)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:545)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:545)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
	... 1 more
Time taken: 6.146 seconds

@penghuo
Copy link
Collaborator

penghuo commented Feb 5, 2024

Proposed Solutions

Using SHOW TABLE EXTENDED to filter out hive table. The procedures are

  1. SHOW TABLE EXTENDED IN database LIKE '*'
  2. Filter on information column, if it contain Provider: hive, it is hive table. For instance

Hive table info

SHOW TABLE EXTENDED IN `test-db` LIKE 'my_hive_table'
Database: test-db 
Table: my_hive_table 
Owner: owner 
Created Time: Mon Jan 08 17:28:48 UTC 2024 
Last Access: Mon Jan 08 17:28:48 UTC 2024 
Created By: Spark 2.2 or prior 
Type: EXTERNAL 
Provider: hive 

Spark datasource table info

Database: default 
Table: alb_logs 
Owner: hadoop 
Created Time: Mon Jan 08 18:54:37 UTC 2024 
Last Access: UNKNOWN 
Created By: Spark 3.3.2-amzn-0 
Type: EXTERNAL 
Provider: csv

@dai-chen
Copy link
Collaborator Author

If auto_refresh is true, user should not specify incremental_refresh or only specify it false.

@vmmusings
Copy link
Member

Another Validation Required is restricting the length of the index name.

@dai-chen dai-chen self-assigned this Mar 22, 2024
@dai-chen dai-chen moved this from Todo to In Progress in OpenSearch Spark Project Planning Mar 22, 2024
@dai-chen
Copy link
Collaborator Author

dai-chen commented Mar 22, 2024

Summary

Here is an summary for all issues listed above, especially CREATE Flint index DDL statement.

Out of Scope

  1. Note that validation for table and column existence is handled within the Flint index builder and is therefore not detailed here.
  2. Due to the streaming job logic being executed inside Spark during job initiation, it is not feasible to validate all aspects beforehand. In such cases, we aim to capture and record the internal error messages in [META] Improve error messaging and exception handling #281.

Index Option Validations

  • Auto Refresh

    1. Incremental refresh cannot be enabled if auto refresh is set.
    2. The source table must not be a Hive table.
    3. Checkpoint location must be a valid S3 bucket with appropriate permissions.
    4. Checkpoint location is mandatory if the checkpoint mandatory option is enabled.
    5. Refresh interval should not exceed 1 month.
  • Incremental Manual Refresh

    1. Same validation requirements 1, 2, 3 as Auto Refresh above.
    2. Checkpoint location is mandatory regardless of the checkpoint mandatory option value.
  • Full Manual Refresh

Other Validations

  • Flint Index Name

    1. The maximum length is constrained by OpenSearch (index name <= 255 chars).
  • Flint Index Specifics

    1. For skipping index: ensure column type is supported by skip type
    2. For materialized view:
      • MV query is valid without syntax or semantic error;
      • Windowing function and watermark delay are required if aggregated

@dai-chen
Copy link
Collaborator Author

Tested checkpoint location validate approach. CheckpointFileManager is the same abstraction used by Spark streaming job.

scala> val checkpointMgr = CheckpointFileManager.create(new Path("s3://test/123"), spark.sessionState.newHadoopConf)
checkpointMgr: org.apache.spark.sql.execution.streaming.CheckpointFileManager
 = org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@7b976364

scala> checkpointMgr.exists(new Path("s3://test/123"))
java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
All access to this object has been disabled (Service: Amazon S3; Status Code: 403; Error Code: AllAccessDisabled;
  ...

checkpointMgr.exists(new Path("s3://benchmark/httplogs"))
res4: Boolean = true

@dai-chen
Copy link
Collaborator Author

dai-chen commented May 13, 2024

Finished high priority items in Index Option Validations section already. Will track other minor items separately as needed in future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.4 enhancement New feature or request
Development

No branches or pull requests

3 participants