From f8d872873d148ed31a6540af31e46ba8cdf7120f Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 6 May 2024 13:04:50 -0700 Subject: [PATCH] Add comments to query handlers Signed-off-by: Tomoyuki Morita --- .../opensearch/sql/spark/dispatcher/BatchQueryHandler.java | 5 +++++ .../opensearch/sql/spark/dispatcher/IndexDMLHandler.java | 7 ++++++- .../sql/spark/dispatcher/InteractiveQueryHandler.java | 6 ++++++ .../sql/spark/dispatcher/RefreshQueryHandler.java | 6 +++++- .../sql/spark/dispatcher/StreamingQueryHandler.java | 6 +++++- .../sql/spark/flint/IndexDMLResultStorageService.java | 4 ++++ 6 files changed, 31 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index e9356e5bed..547547bb48 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -28,6 +28,11 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +/** + * The handler for batch query. + * With batch query, queries are executed as single batch. The queries are sent along with job + * execution request to spark. + */ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { private final EMRServerlessClient emrServerlessClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index dfd5316f6c..f0f8cba299 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -31,7 +31,12 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Index DML query. includes * DROP * ALT? */ +/** + * The handler for Index DML (Data Manipulation Language) query. + * Handles DROP/ALT/VACUUM operation for flint indices. + * It will stop streaming query job as needed (e.g. when the flint index is automatically updated + * by a streaming query, the streaming query is stopped when the index is dropped) + */ @RequiredArgsConstructor public class IndexDMLHandler extends AsyncQueryHandler { private static final Logger LOG = LogManager.getLogger(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 7602988d26..c870b92893 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -35,6 +35,12 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +/** + * The handler for interactive query. + * With interactive query, a session will be first established and then the session will be reused + * for the following queries(statements). Session is an abstraction of spark job, and once the job + * is started, the job will continuously poll the statements and execute query specified in it. + */ @RequiredArgsConstructor public class InteractiveQueryHandler extends AsyncQueryHandler { private final SessionManager sessionManager; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index aeb5c1b35f..112e91e0b4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -20,7 +20,11 @@ import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Refresh Query. */ +/** + * The handler for refresh query. + * Refresh query is one time query request to refresh(update) flint index, and new job is submitted + * to Spark. + */ public class RefreshQueryHandler extends BatchQueryHandler { private final FlintIndexMetadataService flintIndexMetadataService; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 8170b41c66..2b54233386 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -26,7 +26,11 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Streaming Query. */ +/** + * The handler for streaming query. + * Streaming query is a job to continuously update flint index. + * Once started, the job can be stopped by IndexDML query. + */ public class StreamingQueryHandler extends BatchQueryHandler { private final EMRServerlessClient emrServerlessClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java index 4a046564f5..afb408fe4c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java @@ -7,6 +7,10 @@ import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; +/** + * Abstraction over the IndexDMLResult storage. + * It stores the result of IndexDML query execution. + */ public interface IndexDMLResultStorageService { IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName); }