Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Oct 13, 2023
1 parent 78acb1a commit 9693f31
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ public class InteractiveSession implements Session {
private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final EMRServerlessClient serverlessClient;
private final CreateSessionRequest createSessionRequest;

private SessionModel sessionModel;

@Override
public void open() {
public void open(CreateSessionRequest createSessionRequest) {
try {
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
void open();
void open(CreateSessionRequest createSessionRequest);

/** close session. */
void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ public Session createSession(CreateSessionRequest request) {
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.createSessionRequest(request)
.build();
session.open();
session.open(request);
return session;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ public void openCloseSession() {
.sessionId(SessionId.newSessionId())
.sessionStateStore(stateStore)
.serverlessClient(emrsClient)
.createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource"))
.build();

// open session
TestSession testSession = testSession(session, stateStore);
testSession.open().assertSessionState(NOT_STARTED).assertAppId("appId").assertJobId("jobId");
testSession
.open(new CreateSessionRequest(startJobRequest, "datasource"))
.assertSessionState(NOT_STARTED)
.assertAppId("appId")
.assertJobId("jobId");
emrsClient.startJobRunCalled(1);

// close session
Expand All @@ -73,19 +76,19 @@ public void openSessionFailedConflict() {
.sessionId(sessionId)
.sessionStateStore(stateStore)
.serverlessClient(emrsClient)
.createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource"))
.build();
session.open();
session.open(new CreateSessionRequest(startJobRequest, "datasource"));

InteractiveSession duplicateSession =
InteractiveSession.builder()
.sessionId(sessionId)
.sessionStateStore(stateStore)
.serverlessClient(emrsClient)
.createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource"))
.build();
IllegalStateException exception =
assertThrows(IllegalStateException.class, duplicateSession::open);
assertThrows(
IllegalStateException.class,
() -> duplicateSession.open(new CreateSessionRequest(startJobRequest, "datasource")));
assertEquals("session already exist. sessionId=duplicate-session-id", exception.getMessage());
}

Expand All @@ -97,9 +100,8 @@ public void closeNotExistSession() {
.sessionId(sessionId)
.sessionStateStore(stateStore)
.serverlessClient(emrsClient)
.createSessionRequest(new CreateSessionRequest(startJobRequest, "datasource"))
.build();
session.open();
session.open(new CreateSessionRequest(startJobRequest, "datasource"));

client().delete(new DeleteRequest(indexName, sessionId.getSessionId()));

Expand Down Expand Up @@ -167,8 +169,8 @@ public TestSession assertJobId(String expected) {
return this;
}

public TestSession open() {
session.open();
public TestSession open(CreateSessionRequest req) {
session.open(req);
return this;
}

Expand Down

0 comments on commit 9693f31

Please sign in to comment.