diff --git a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index 8d57198277..18dce7b7b2 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -237,7 +237,8 @@ private void createIndex(String indexName) { } } - private long count(String indexName, QueryBuilder query) { + @VisibleForTesting + public long count(String indexName, QueryBuilder query) { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchSourceBuilder.size(0); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java index 8016f78ce9..2d5f27dd62 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/leasemanager/DefaultLeaseManagerTest.java @@ -5,7 +5,9 @@ package org.opensearch.sql.spark.leasemanager; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -23,22 +25,33 @@ class DefaultLeaseManagerTest { @Mock private StateStore stateStore; @Test - public void concurrentSessionRuleOnlyApplyToInteractiveQuery() { - assertTrue( - new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) - .test(new LeaseRequest(JobType.BATCH, "mys3"))); - assertTrue( - new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore) - .test(new LeaseRequest(JobType.STREAMING, "mys3"))); + public void leaseManagerRejectsJobs() { + when(stateStore.count(any(), any())).thenReturn(3L); + when(settings.getSettingValue(any())).thenReturn(3); + DefaultLeaseManager defaultLeaseManager = new DefaultLeaseManager(settings, stateStore); + + defaultLeaseManager.borrow(getLeaseRequest(JobType.BATCH)); + assertThrows(ConcurrencyLimitExceededException.class, () -> + defaultLeaseManager.borrow(getLeaseRequest(JobType.INTERACTIVE))); + assertThrows(ConcurrencyLimitExceededException.class, () -> + defaultLeaseManager.borrow(getLeaseRequest(JobType.STREAMING))); + assertThrows(ConcurrencyLimitExceededException.class, () -> + defaultLeaseManager.borrow(getLeaseRequest(JobType.REFRESH))); } @Test - public void concurrentRefreshRuleNotAppliedToInteractiveAndBatchQuery() { - assertTrue( - new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore) - .test(new LeaseRequest(JobType.INTERACTIVE, "mys3"))); - assertTrue( - new DefaultLeaseManager.ConcurrentRefreshJobRule(settings, stateStore) - .test(new LeaseRequest(JobType.BATCH, "mys3"))); + public void leaseManagerAcceptsJobs() { + when(stateStore.count(any(), any())).thenReturn(2L); + when(settings.getSettingValue(any())).thenReturn(3); + DefaultLeaseManager defaultLeaseManager = new DefaultLeaseManager(settings, stateStore); + + defaultLeaseManager.borrow(getLeaseRequest(JobType.BATCH)); + defaultLeaseManager.borrow(getLeaseRequest(JobType.INTERACTIVE)); + defaultLeaseManager.borrow(getLeaseRequest(JobType.STREAMING)); + defaultLeaseManager.borrow(getLeaseRequest(JobType.REFRESH)); + } + + private LeaseRequest getLeaseRequest(JobType jobType) { + return new LeaseRequest(jobType, "mys3"); } }