From c13f7705fca39c737b11fcf6fb0bf5ce9fb540ef Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Tue, 3 Sep 2024 22:13:57 -0700 Subject: [PATCH] Fix jobType for Batch and IndexDML query (#2955) Signed-off-by: Tomoyuki Morita --- .../sql/spark/dispatcher/BatchQueryHandler.java | 2 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 4 ++-- .../asyncquery/AsyncQueryCoreIntegTest.java | 17 +++++++++-------- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index bce1918631..36e4c227b8 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -109,7 +109,7 @@ public DispatchQueryResponse submit( .jobId(jobId) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index d5885e6f2a..4698bfcccc 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -82,7 +82,7 @@ public DispatchQueryResponse submit( .jobId(DML_QUERY_JOB_ID) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } catch (Exception e) { LOG.error(e.getMessage()); @@ -100,7 +100,7 @@ public DispatchQueryResponse submit( .jobId(DML_QUERY_JOB_ID) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) - .jobType(JobType.INTERACTIVE) + .jobType(JobType.BATCH) .build(); } } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index feb8c8c0ac..09767d16bd 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -202,7 +202,7 @@ public void createDropIndexQuery() { verifyGetQueryIdCalled(); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -224,7 +224,7 @@ public void createVacuumIndexQuery() { verifyGetQueryIdCalled(); verify(flintIndexClient).deleteIndex(indexName); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -255,7 +255,7 @@ public void createAlterIndexQuery() { assertFalse(flintIndexOptions.autoRefresh()); verifyCancelJobRunCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test @@ -280,7 +280,7 @@ public void createStreamingQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.STREAMING); } private void verifyStartJobRunCalled() { @@ -315,7 +315,7 @@ public void createCreateIndexQuery() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH); } @Test @@ -337,7 +337,7 @@ public void createRefreshQuery() { verifyGetQueryIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH); } @Test @@ -363,7 +363,7 @@ public void createInteractiveQuery() { verifyGetSessionIdCalled(); verify(leaseManager).borrow(any()); verifyStartJobRunCalled(); - verifyStoreJobMetadataCalled(JOB_ID); + verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE); } @Test @@ -560,7 +560,7 @@ private void verifyGetSessionIdCalled() { assertEquals(APPLICATION_ID, createSessionRequest.getApplicationId()); } - private void verifyStoreJobMetadataCalled(String jobId) { + private void verifyStoreJobMetadataCalled(String jobId, JobType jobType) { verify(asyncQueryJobMetadataStorageService) .storeJobMetadata( asyncQueryJobMetadataArgumentCaptor.capture(), eq(asyncQueryRequestContext)); @@ -568,6 +568,7 @@ private void verifyStoreJobMetadataCalled(String jobId) { assertEquals(QUERY_ID, asyncQueryJobMetadata.getQueryId()); assertEquals(jobId, asyncQueryJobMetadata.getJobId()); assertEquals(DATASOURCE_NAME, asyncQueryJobMetadata.getDatasourceName()); + assertEquals(jobType, asyncQueryJobMetadata.getJobType()); } private void verifyCreateIndexDMLResultCalled() {