From 0699d58ca21758a2c69991b72bf5bbf915a44ebc Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 25 Oct 2023 13:39:46 -0700 Subject: [PATCH] Create new session if session is invalid Signed-off-by: Peng Huo --- .../dispatcher/SparkQueryDispatcher.java | 5 ++--- ...AsyncQueryExecutorServiceImplSpecTest.java | 19 ++++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 8feeddcafc..5e80259e09 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -219,10 +219,9 @@ private DispatchQueryResponse handleNonIndexQuery(DispatchQueryRequest dispatchQ // get session from request SessionId sessionId = new SessionId(dispatchQueryRequest.getSessionId()); Optional createdSession = sessionManager.getSession(sessionId); - if (createdSession.isEmpty()) { - throw new IllegalArgumentException("no session found. " + sessionId); + if (createdSession.isPresent()) { + session = createdSession.get(); } - session = createdSession.get(); } if (session == null || !session.isReady()) { // create session if not exist or session dead/fail diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 6bc40c009b..452528b319 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -460,7 +460,7 @@ public void recreateSessionIfNotReady() { } @Test - public void submitQueryInInvalidSessionThrowException() { + public void submitQueryInInvalidSessionWillCreateNewSession() { LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = createAsyncQueryExecutorService(emrsClient); @@ -468,16 +468,13 @@ public void submitQueryInInvalidSessionThrowException() { // enable session enableSession(true); - // 1. create async query. - SessionId sessionId = SessionId.newSessionId(DATASOURCE); - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest( - "select 1", DATASOURCE, LangType.SQL, sessionId.getSessionId()))); - assertEquals("no session found. " + sessionId, exception.getMessage()); + // 1. create async query with invalid sessionId + SessionId invalidSessionId = SessionId.newSessionId(DATASOURCE); + CreateAsyncQueryResponse asyncQuery = asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "select 1", DATASOURCE, LangType.SQL, invalidSessionId.getSessionId())); + assertNotNull(asyncQuery.getSessionId()); + assertNotEquals(invalidSessionId.getSessionId(), asyncQuery.getSessionId()); } private DataSourceServiceImpl createDataSourceService() {