From 9693f31787f32e86bcd65ef45a46698dad997059 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 13 Oct 2023 13:57:36 -0700 Subject: [PATCH] address comments Signed-off-by: Peng Huo --- .../execution/session/InteractiveSession.java | 3 +-- .../sql/spark/execution/session/Session.java | 2 +- .../execution/session/SessionManager.java | 3 +-- .../session/InteractiveSessionTest.java | 22 ++++++++++--------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java index 2898f4b87b..620e46b9be 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -29,12 +29,11 @@ public class InteractiveSession implements Session { private final SessionId sessionId; private final SessionStateStore sessionStateStore; private final EMRServerlessClient serverlessClient; - private final CreateSessionRequest createSessionRequest; private SessionModel sessionModel; @Override - public void open() { + public void open(CreateSessionRequest createSessionRequest) { try { String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java index 449a9af538..ec9775e60a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java @@ -8,7 +8,7 @@ /** Session define the statement execution context. Each session is binding to one Spark Job. */ public interface Session { /** open session. */ - void open(); + void open(CreateSessionRequest createSessionRequest); /** close session. */ void close(); diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index 2166c91568..3d0916bac8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -28,9 +28,8 @@ public Session createSession(CreateSessionRequest request) { .sessionId(newSessionId()) .sessionStateStore(stateStore) .serverlessClient(emrServerlessClient) - .createSessionRequest(request) .build(); - session.open(); + session.open(request); return session; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index 3ff547157c..53dc211ded 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -52,12 +52,15 @@ public void openCloseSession() { .sessionId(SessionId.newSessionId()) .sessionStateStore(stateStore) .serverlessClient(emrsClient) - .createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource")) .build(); // open session TestSession testSession = testSession(session, stateStore); - testSession.open().assertSessionState(NOT_STARTED).assertAppId("appId").assertJobId("jobId"); + testSession + .open(new CreateSessionRequest(startJobRequest, "datasource")) + .assertSessionState(NOT_STARTED) + .assertAppId("appId") + .assertJobId("jobId"); emrsClient.startJobRunCalled(1); // close session @@ -73,19 +76,19 @@ public void openSessionFailedConflict() { .sessionId(sessionId) .sessionStateStore(stateStore) .serverlessClient(emrsClient) - .createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource")) .build(); - session.open(); + session.open(new CreateSessionRequest(startJobRequest, "datasource")); InteractiveSession duplicateSession = InteractiveSession.builder() .sessionId(sessionId) .sessionStateStore(stateStore) .serverlessClient(emrsClient) - .createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource")) .build(); IllegalStateException exception = - assertThrows(IllegalStateException.class, duplicateSession::open); + assertThrows( + IllegalStateException.class, + () -> duplicateSession.open(new CreateSessionRequest(startJobRequest, "datasource"))); assertEquals("session already exist. sessionId=duplicate-session-id", exception.getMessage()); } @@ -97,9 +100,8 @@ public void closeNotExistSession() { .sessionId(sessionId) .sessionStateStore(stateStore) .serverlessClient(emrsClient) - .createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource")) .build(); - session.open(); + session.open(new CreateSessionRequest(startJobRequest, "datasource")); client().delete(new DeleteRequest(indexName, sessionId.getSessionId())); @@ -167,8 +169,8 @@ public TestSession assertJobId(String expected) { return this; } - public TestSession open() { - session.open(); + public TestSession open(CreateSessionRequest req) { + session.open(req); return this; }