From 0d4cd94c951d9ccbcdd24629152f40bb98bc8f2c Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 4 Sep 2024 17:31:17 -0700 Subject: [PATCH] Fix jobType for Batch and IndexDML query (#2955) (#2982) (cherry picked from commit c13f7705fca39c737b11fcf6fb0bf5ce9fb540ef) Signed-off-by: Tomoyuki Morita Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../sql/spark/dispatcher/BatchQueryHandler.java | 2 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 4 ++-- .../asyncquery/AsyncQueryCoreIntegTest.java | 17 +++++++++-------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index bce1918631..36e4c227b8 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -109,7 +109,7 @@ public DispatchQueryResponse submit( .jobId(jobId) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index d5885e6f2a..4698bfcccc 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -82,7 +82,7 @@ public DispatchQueryResponse submit( .jobId(DML_QUERY_JOB_ID) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } catch (Exception e) { LOG.error(e.getMessage()); @@ -100,7 +100,7 @@ public DispatchQueryResponse submit( .jobId(DML_QUERY_JOB_ID) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 49ea38c2dc..226e0ff5eb 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -205,7 +205,7 @@ public void createDropIndexQuery() { verifyGetQueryIdCalled(); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -251,7 +251,7 @@ public void createVacuumIndexQuery() { verifyGetQueryIdCalled(); verify(flintIndexClient).deleteIndex(indexName); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -308,7 +308,7 @@ public void createAlterIndexQuery() { assertFalse(flintIndexOptions.autoRefresh()); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -367,7 +367,7 @@ public void createStreamingQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.STREAMING); } private void verifyStartJobRunCalled() { @@ -402,7 +402,7 @@ public void createCreateIndexQuery() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH); } @Test @@ -424,7 +424,7 @@ public void createRefreshQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH); } @Test @@ -450,7 +450,7 @@ public void createInteractiveQuery() { verifyGetSessionIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE); } @Test @@ -666,7 +666,7 @@ private void verifyGetSessionIdCalled() { assertEquals(APPLICATION_ID, createSessionRequest.getApplicationId()); } - private void verifyStoreJobMetadataCalled(String jobId) { + private void verifyStoreJobMetadataCalled(String jobId, JobType jobType) { verify(asyncQueryJobMetadataStorageService) .storeJobMetadata( asyncQueryJobMetadataArgumentCaptor.capture(), eq(asyncQueryRequestContext)); @@ -674,6 +674,7 @@ private void verifyStoreJobMetadataCalled(String jobId) { assertEquals(QUERY_ID, asyncQueryJobMetadata.getQueryId()); assertEquals(jobId, asyncQueryJobMetadata.getJobId()); assertEquals(DATASOURCE_NAME, asyncQueryJobMetadata.getDatasourceName()); + assertEquals(jobType, asyncQueryJobMetadata.getJobType()); } private void verifyCreateIndexDMLResultCalled() {