From f8d872873d148ed31a6540af31e46ba8cdf7120f Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 6 May 2024 13:04:50 -0700 Subject: [PATCH 1/3] Add comments to query handlers Signed-off-by: Tomoyuki Morita --- .../opensearch/sql/spark/dispatcher/BatchQueryHandler.java | 5 +++++ .../opensearch/sql/spark/dispatcher/IndexDMLHandler.java | 7 ++++++- .../sql/spark/dispatcher/InteractiveQueryHandler.java | 6 ++++++ .../sql/spark/dispatcher/RefreshQueryHandler.java | 6 +++++- .../sql/spark/dispatcher/StreamingQueryHandler.java | 6 +++++- .../sql/spark/flint/IndexDMLResultStorageService.java | 4 ++++ 6 files changed, 31 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index e9356e5bed..547547bb48 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -28,6 +28,11 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +/** + * The handler for batch query. + * With batch query, queries are executed as single batch. The queries are sent along with job + * execution request to spark. + */ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { private final EMRServerlessClient emrServerlessClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index dfd5316f6c..f0f8cba299 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -31,7 +31,12 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Index DML query. includes * DROP * ALT? */ +/** + * The handler for Index DML (Data Manipulation Language) query. + * Handles DROP/ALT/VACUUM operation for flint indices. + * It will stop streaming query job as needed (e.g. when the flint index is automatically updated + * by a streaming query, the streaming query is stopped when the index is dropped) + */ @RequiredArgsConstructor public class IndexDMLHandler extends AsyncQueryHandler { private static final Logger LOG = LogManager.getLogger(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 7602988d26..c870b92893 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -35,6 +35,12 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; +/** + * The handler for interactive query. + * With interactive query, a session will be first established and then the session will be reused + * for the following queries(statements). Session is an abstraction of spark job, and once the job + * is started, the job will continuously poll the statements and execute query specified in it. + */ @RequiredArgsConstructor public class InteractiveQueryHandler extends AsyncQueryHandler { private final SessionManager sessionManager; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index aeb5c1b35f..112e91e0b4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -20,7 +20,11 @@ import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Refresh Query. */ +/** + * The handler for refresh query. + * Refresh query is one time query request to refresh(update) flint index, and new job is submitted + * to Spark. + */ public class RefreshQueryHandler extends BatchQueryHandler { private final FlintIndexMetadataService flintIndexMetadataService; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 8170b41c66..2b54233386 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -26,7 +26,11 @@ import org.opensearch.sql.spark.leasemanager.model.LeaseRequest; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -/** Handle Streaming Query. */ +/** + * The handler for streaming query. + * Streaming query is a job to continuously update flint index. + * Once started, the job can be stopped by IndexDML query. + */ public class StreamingQueryHandler extends BatchQueryHandler { private final EMRServerlessClient emrServerlessClient; diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java index 4a046564f5..afb408fe4c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java @@ -7,6 +7,10 @@ import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; +/** + * Abstraction over the IndexDMLResult storage. + * It stores the result of IndexDML query execution. + */ public interface IndexDMLResultStorageService { IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName); } From 1fc25b3dba231ecae5ced5af4d4397190bfaf75b Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 6 May 2024 15:14:49 -0700 Subject: [PATCH 2/3] Reformat Signed-off-by: Tomoyuki Morita --- .../sql/spark/dispatcher/BatchQueryHandler.java | 5 ++--- .../opensearch/sql/spark/dispatcher/IndexDMLHandler.java | 8 ++++---- .../sql/spark/dispatcher/InteractiveQueryHandler.java | 8 ++++---- .../sql/spark/dispatcher/RefreshQueryHandler.java | 5 ++--- .../sql/spark/dispatcher/StreamingQueryHandler.java | 3 +-- .../sql/spark/flint/IndexDMLResultStorageService.java | 3 +-- 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 547547bb48..4dc075e2dc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -29,9 +29,8 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for batch query. - * With batch query, queries are executed as single batch. The queries are sent along with job - * execution request to spark. + * The handler for batch query. With batch query, queries are executed as single batch. The queries + * are sent along with job execution request to spark. */ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index f0f8cba299..5924407785 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -32,10 +32,10 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for Index DML (Data Manipulation Language) query. - * Handles DROP/ALT/VACUUM operation for flint indices. - * It will stop streaming query job as needed (e.g. when the flint index is automatically updated - * by a streaming query, the streaming query is stopped when the index is dropped) + * The handler for Index DML (Data Manipulation Language) query. Handles DROP/ALT/VACUUM operation + * for flint indices. It will stop streaming query job as needed (e.g. when the flint index is + * automatically updated by a streaming query, the streaming query is stopped when the index is + * dropped) */ @RequiredArgsConstructor public class IndexDMLHandler extends AsyncQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index c870b92893..7475c5a7ae 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -36,10 +36,10 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for interactive query. - * With interactive query, a session will be first established and then the session will be reused - * for the following queries(statements). Session is an abstraction of spark job, and once the job - * is started, the job will continuously poll the statements and execute query specified in it. + * The handler for interactive query. With interactive query, a session will be first established + * and then the session will be reused for the following queries(statements). Session is an + * abstraction of spark job, and once the job is started, the job will continuously poll the + * statements and execute query specified in it. */ @RequiredArgsConstructor public class InteractiveQueryHandler extends AsyncQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index 112e91e0b4..edb0a3f507 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -21,9 +21,8 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for refresh query. - * Refresh query is one time query request to refresh(update) flint index, and new job is submitted - * to Spark. + * The handler for refresh query. Refresh query is one time query request to refresh(update) flint + * index, and new job is submitted to Spark. */ public class RefreshQueryHandler extends BatchQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 2b54233386..02cc936ac2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -27,8 +27,7 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for streaming query. - * Streaming query is a job to continuously update flint index. + * The handler for streaming query. Streaming query is a job to continuously update flint index. * Once started, the job can be stopped by IndexDML query. */ public class StreamingQueryHandler extends BatchQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java index afb408fe4c..31d4be511e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java @@ -8,8 +8,7 @@ import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; /** - * Abstraction over the IndexDMLResult storage. - * It stores the result of IndexDML query execution. + * Abstraction over the IndexDMLResult storage. It stores the result of IndexDML query execution. */ public interface IndexDMLResultStorageService { IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName); From f5edee30cc4f4dbf336d91bf2c857fa31d77391c Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 9 May 2024 11:08:58 -0700 Subject: [PATCH 3/3] Fix comments Signed-off-by: Tomoyuki Morita --- .../org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java | 2 +- .../org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index 4dc075e2dc..2f38e1fac4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -30,7 +30,7 @@ /** * The handler for batch query. With batch query, queries are executed as single batch. The queries - * are sent along with job execution request to spark. + * are sent along with job execution request ({@link StartJobRequest}) to spark. */ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 5924407785..b2bb590c1e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -32,7 +32,7 @@ import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** - * The handler for Index DML (Data Manipulation Language) query. Handles DROP/ALT/VACUUM operation + * The handler for Index DML (Data Manipulation Language) query. Handles DROP/ALTER/VACUUM operation * for flint indices. It will stop streaming query job as needed (e.g. when the flint index is * automatically updated by a streaming query, the streaming query is stopped when the index is * dropped)