Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.11] [Backport 2.x] add InteractiveSession and SessionManager #2315

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,47 @@ dependencies {
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation(platform("org.junit:junit-bom:5.6.2"))

testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0'
testImplementation 'junit:junit:4.13.1'
testImplementation "org.opensearch.test:framework:${opensearch_version}"

testCompileOnly('junit:junit:4.13.1') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.vintage:junit-vintage-engine") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
testRuntimeOnly("org.junit.platform:junit-platform-launcher") {
because 'allows tests to run from IDEs that bundle older version of launcher'
}
testImplementation("org.opensearch.test:framework:${opensearch_version}")
}

test {
useJUnitPlatform()
useJUnitPlatform {
includeEngines("junit-jupiter")
}
testLogging {
events "failed"
exceptionFormat "full"
}
}
task junit4(type: Test) {
useJUnitPlatform {
includeEngines("junit-vintage")
}
systemProperty 'tests.security.manager', 'false'
testLogging {
events "failed"
exceptionFormat "full"
}
}

jacocoTestReport {
dependsOn test, junit4
executionData test, junit4
reports {
html.enabled true
xml.enabled true
Expand All @@ -78,9 +103,10 @@ jacocoTestReport {
}))
}
}
test.finalizedBy(project.tasks.jacocoTestReport)

jacocoTestCoverageVerification {
dependsOn test, junit4
executionData test, junit4
violationRules {
rule {
element = 'CLASS'
Expand All @@ -92,6 +118,9 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.asyncquery.exceptions.*',
'org.opensearch.sql.spark.dispatcher.model.*',
'org.opensearch.sql.spark.flint.FlintIndexType',
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.SessionStateStore',
'org.opensearch.sql.spark.execution.session.SessionModel'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.opensearch.sql.spark.client.StartJobRequest;

@Data
public class CreateSessionRequest {
private final StartJobRequest startJobRequest;
private final String datasourceName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession;

import java.util.Optional;
import lombok.Builder;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Interactive session.
*
* <p>ENTRY_STATE: not_started
*/
@Getter
@Builder
public class InteractiveSession implements Session {
private static final Logger LOG = LogManager.getLogger();

private final SessionId sessionId;
private final SessionStateStore sessionStateStore;
private final EMRServerlessClient serverlessClient;

private SessionModel sessionModel;

@Override
public void open(CreateSessionRequest createSessionRequest) {
try {
String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest());
String applicationId = createSessionRequest.getStartJobRequest().getApplicationId();

sessionModel =
initInteractiveSession(
applicationId, jobID, sessionId, createSessionRequest.getDatasourceName());
sessionStateStore.create(sessionModel);
} catch (VersionConflictEngineException e) {
String errorMsg = "session already exist. " + sessionId;
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
}

@Override
public void close() {
Optional<SessionModel> model = sessionStateStore.get(sessionModel.getSessionId());
if (model.isEmpty()) {
throw new IllegalStateException("session not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

/** Session define the statement execution context. Each session is binding to one Spark Job. */
public interface Session {
/** open session. */
void open(CreateSessionRequest createSessionRequest);

/** close session. */
void close();

SessionModel getSessionModel();

SessionId getSessionId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.Data;
import org.apache.commons.lang3.RandomStringUtils;

@Data
public class SessionId {
private final String sessionId;

public static SessionId newSessionId() {
return new SessionId(RandomStringUtils.random(10, true, true));
}

@Override
public String toString() {
return "sessionId=" + sessionId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.execution.statestore.SessionStateStore;

/**
* Singleton Class
*
* <p>todo. add Session cache and Session sweeper.
*/
@RequiredArgsConstructor
public class SessionManager {
private final SessionStateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(newSessionId())
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.build();
session.open(request);
return session;
}

public Optional<Session> getSession(SessionId sid) {
Optional<SessionModel> model = stateStore.get(sid);
if (model.isPresent()) {
InteractiveSession session =
InteractiveSession.builder()
.sessionId(sid)
.sessionStateStore(stateStore)
.serverlessClient(emrServerlessClient)
.sessionModel(model.get())
.build();
return Optional.ofNullable(session);
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE;

import java.io.IOException;
import lombok.Builder;
import lombok.Data;
import lombok.SneakyThrows;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.seqno.SequenceNumbers;

/** Session data in flint.ql.sessions index. */
@Data
@Builder
public class SessionModel implements ToXContentObject {
public static final String VERSION = "version";
public static final String TYPE = "type";
public static final String SESSION_TYPE = "sessionType";
public static final String SESSION_ID = "sessionId";
public static final String SESSION_STATE = "state";
public static final String DATASOURCE_NAME = "dataSourceName";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String APPLICATION_ID = "applicationId";
public static final String JOB_ID = "jobId";
public static final String ERROR = "error";
public static final String UNKNOWN = "unknown";
public static final String SESSION_DOC_TYPE = "session";

private final String version;
private final SessionType sessionType;
private final SessionId sessionId;
private final SessionState sessionState;
private final String applicationId;
private final String jobId;
private final String datasourceName;
private final String error;
private final long lastUpdateTime;

private final long seqNo;
private final long primaryTerm;

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder
.startObject()
.field(VERSION, version)
.field(TYPE, SESSION_DOC_TYPE)
.field(SESSION_TYPE, sessionType.getSessionType())
.field(SESSION_ID, sessionId.getSessionId())
.field(SESSION_STATE, sessionState.getSessionState())
.field(DATASOURCE_NAME, datasourceName)
.field(APPLICATION_ID, applicationId)
.field(JOB_ID, jobId)
.field(LAST_UPDATE_TIME, lastUpdateTime)
.field(ERROR, error)
.endObject();
return builder;
}

public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) {
return builder()
.version(copy.version)
.sessionType(copy.sessionType)
.sessionId(new SessionId(copy.sessionId.getSessionId()))
.sessionState(copy.sessionState)
.datasourceName(copy.datasourceName)
.seqNo(seqNo)
.primaryTerm(primaryTerm)
.build();
}

@SneakyThrows

Check warning on line 81 in spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java#L81

Added line #L81 was not covered by tests
public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) {
SessionModelBuilder builder = new SessionModelBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case VERSION:
builder.version(parser.text());
break;
case SESSION_TYPE:
builder.sessionType(SessionType.fromString(parser.text()));
break;
case SESSION_ID:
builder.sessionId(new SessionId(parser.text()));
break;
case SESSION_STATE:
builder.sessionState(SessionState.fromString(parser.text()));
break;
case DATASOURCE_NAME:
builder.datasourceName(parser.text());
break;
case ERROR:
builder.error(parser.text());
break;
case APPLICATION_ID:
builder.applicationId(parser.text());
break;
case JOB_ID:
builder.jobId(parser.text());
break;
case LAST_UPDATE_TIME:
builder.lastUpdateTime(parser.longValue());
break;
case TYPE:
// do nothing.
break;
}
}
builder.seqNo(seqNo);
builder.primaryTerm(primaryTerm);
return builder.build();
}

public static SessionModel initInteractiveSession(
String applicationId, String jobId, SessionId sid, String datasourceName) {
return builder()
.version("1.0")
.sessionType(INTERACTIVE)
.sessionId(sid)
.sessionState(NOT_STARTED)
.datasourceName(datasourceName)
.applicationId(applicationId)
.jobId(jobId)
.error(UNKNOWN)
.lastUpdateTime(System.currentTimeMillis())
.seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO)
.primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
.build();
}
}
Loading
Loading