Skip to content

Commit

Permalink
Fix jobType for Batch and IndexDML query (#2955) (#2982)
Browse files Browse the repository at this point in the history
(cherry picked from commit c13f770)

Signed-off-by: Tomoyuki Morita <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 9b19238 commit 0d4cd94
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public DispatchQueryResponse submit(
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DispatchQueryResponse submit(
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.build();
} catch (Exception e) {
LOG.error(e.getMessage());
Expand All @@ -100,7 +100,7 @@ public DispatchQueryResponse submit(
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void createDropIndexQuery() {
verifyGetQueryIdCalled();
verifyCancelJobRunCalled();
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);
}

@Test
Expand Down Expand Up @@ -251,7 +251,7 @@ public void createVacuumIndexQuery() {
verifyGetQueryIdCalled();
verify(flintIndexClient).deleteIndex(indexName);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);
}

@Test
Expand Down Expand Up @@ -308,7 +308,7 @@ public void createAlterIndexQuery() {
assertFalse(flintIndexOptions.autoRefresh());
verifyCancelJobRunCalled();
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID);
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH);
}

@Test
Expand Down Expand Up @@ -367,7 +367,7 @@ public void createStreamingQuery() {
verifyGetQueryIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID);
verifyStoreJobMetadataCalled(JOB_ID, JobType.STREAMING);
}

private void verifyStartJobRunCalled() {
Expand Down Expand Up @@ -402,7 +402,7 @@ public void createCreateIndexQuery() {
assertNull(response.getSessionId());
verifyGetQueryIdCalled();
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID);
verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH);
}

@Test
Expand All @@ -424,7 +424,7 @@ public void createRefreshQuery() {
verifyGetQueryIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID);
verifyStoreJobMetadataCalled(JOB_ID, JobType.BATCH);
}

@Test
Expand All @@ -450,7 +450,7 @@ public void createInteractiveQuery() {
verifyGetSessionIdCalled();
verify(leaseManager).borrow(any());
verifyStartJobRunCalled();
verifyStoreJobMetadataCalled(JOB_ID);
verifyStoreJobMetadataCalled(JOB_ID, JobType.INTERACTIVE);
}

@Test
Expand Down Expand Up @@ -666,14 +666,15 @@ private void verifyGetSessionIdCalled() {
assertEquals(APPLICATION_ID, createSessionRequest.getApplicationId());
}

private void verifyStoreJobMetadataCalled(String jobId) {
private void verifyStoreJobMetadataCalled(String jobId, JobType jobType) {
verify(asyncQueryJobMetadataStorageService)
.storeJobMetadata(
asyncQueryJobMetadataArgumentCaptor.capture(), eq(asyncQueryRequestContext));
AsyncQueryJobMetadata asyncQueryJobMetadata = asyncQueryJobMetadataArgumentCaptor.getValue();
assertEquals(QUERY_ID, asyncQueryJobMetadata.getQueryId());
assertEquals(jobId, asyncQueryJobMetadata.getJobId());
assertEquals(DATASOURCE_NAME, asyncQueryJobMetadata.getDatasourceName());
assertEquals(jobType, asyncQueryJobMetadata.getJobType());
}

private void verifyCreateIndexDMLResultCalled() {
Expand Down

0 comments on commit 0d4cd94

Please sign in to comment.