Skip to content

Commit

Permalink
feat(kill session): adapt global client session and using block in or…
Browse files Browse the repository at this point in the history
…acle model in kill session (#2978)

* adapt global client session and oracle model pl

* modify according to pr commit

* modify code according to pr

* modify code format

* modify code format

* modify code format

* add judgment of directing observer

* modify code format

* Check whether the directly connected sql matches the oracle and mysql modes

* Modify the code according to pr

* modify code format

* add comment for main method

* modify according to pr

* Optimize readability
  • Loading branch information
zijiacj authored Jul 25, 2024
1 parent 743c034 commit 29d49fa
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public SuccessResponse<Boolean> killQuery(@PathVariable String sessionId) {
* kill session
*
* @param req
* @return
* @return kill result
*/
@ApiOperation(value = "kill session", notes = "终止会话接口")
@RequestMapping(value = "/sessions/killSession", method = RequestMethod.POST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package com.oceanbase.odc.service.db.session;

import static com.oceanbase.odc.core.shared.constant.DialectType.OB_MYSQL;

import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -40,12 +43,14 @@

import com.google.common.collect.Lists;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.common.util.VersionUtils;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
import com.oceanbase.odc.core.datasource.SingleConnectionDataSource;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.session.ConnectionSessionConstants;
import com.oceanbase.odc.core.session.ConnectionSessionUtil;
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.core.shared.exception.InternalServerError;
import com.oceanbase.odc.core.shared.model.OdcDBSession;
import com.oceanbase.odc.core.sql.execute.model.JdbcGeneralResult;
Expand Down Expand Up @@ -78,6 +83,13 @@ public class DefaultDBSessionManage implements DBSessionManageFacade {
+ "(?<port>[0-9]{1,5})\\*/.*";
private static final Pattern SERVER_PATTERN = Pattern.compile(SERVER_REGEX);
private static final ConnectionMapper CONNECTION_MAPPER = ConnectionMapper.INSTANCE;
private static final String GLOBAL_CLIENT_SESSION_OB_PROXY_VERSION_NUMBER = "4.2.3";
private static final String GLOBAL_CLIENT_SESSION_OB_VERSION_NUMBER = "4.2.5";
private static final String ORACLE_MODEL_KILL_SESSION_WITH_BLOCK_OB_VERSION_NUMBER = "4.2.1.0";
private static final byte GLOBAL_CLIENT_SESSION_PROXY_ID_MIN = 0;
private static final short GLOBAL_CLIENT_SESSION_PROXY_ID_MAX = 8191;
private static final byte GLOBAL_CLIENT_SESSION_ID_VERSION = 2;


@Autowired
private ConnectSessionService sessionService;
Expand Down Expand Up @@ -198,33 +210,230 @@ private List<KillSessionResult> doKillSessionOrQuery(
@SkipAuthorize("odc internal usage")
public List<JdbcGeneralResult> executeKillSession(ConnectionSession connectionSession, List<SqlTuple> sqlTuples,
String sqlScript) {
List<JdbcGeneralResult> results = executeKillCommands(connectionSession, sqlTuples, sqlScript);
if (connectionSession.getDialectType() == DialectType.OB_MYSQL
|| connectionSession.getDialectType() == DialectType.OB_ORACLE) {
return processResults(connectionSession, results);
}
return results;
}

private List<JdbcGeneralResult> executeKillCommands(ConnectionSession connectionSession, List<SqlTuple> sqlTuples,
String sqlScript) {
List<JdbcGeneralResult> results =
connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.execute(new OdcStatementCallBack(sqlTuples, connectionSession, true, null, false));
if (results == null) {
log.warn("Execution of the kill session command failed with unknown error, sql={}", sqlScript);
throw new InternalServerError("Unknown error");
}
return results.stream().map(jdbcGeneralResult -> {
SqlTuple sqlTuple = jdbcGeneralResult.getSqlTuple();
return results;
}

/**
* process the execution result after the first kill commands. if the result contains unknown thread
* id exception, try to use other solutions to execute the kill commands.
*
* @param connectionSession
* @param results
* @return
*/
private List<JdbcGeneralResult> processResults(ConnectionSession connectionSession,
List<JdbcGeneralResult> results) {
Boolean isDirectedOBServer = isObServerDirected(connectionSession);
String obProxyVersion = getObProxyVersion(connectionSession, isDirectedOBServer);
String obVersion = ConnectionSessionUtil.getVersion(connectionSession);
boolean isEnabledGlobalClientSession =
isGlobalClientSessionEnabled(connectionSession, obProxyVersion, obVersion);
boolean isSupportedOracleModeKillSession = isOracleModeKillSessionSupported(obVersion, connectionSession);
List<JdbcGeneralResult> finalResults = new ArrayList<>();
for (JdbcGeneralResult jdbcGeneralResult : results) {
try {
jdbcGeneralResult.getQueryResult();
} catch (Exception e) {
if (StringUtils.contains(e.getMessage(), "Unknown thread id")) {
try {
log.info("Kill query/session Unknown thread id error, try direct connect observer");
directLinkServerAndExecute(sqlTuple.getExecutedSql(),
connectionSession);
return JdbcGeneralResult.successResult(sqlTuple);
} catch (Exception e1) {
log.warn("Failed to direct connect observer {}", e1.getMessage());
}
if (isUnknownThreadIdError(e)) {
jdbcGeneralResult = handleUnknownThreadIdError(connectionSession,
jdbcGeneralResult, isDirectedOBServer,
isEnabledGlobalClientSession, isSupportedOracleModeKillSession);
} else {
log.warn("Failed to execute sql in kill session scenario, sqlTuple={}", sqlTuple, e);
log.warn("Failed to execute sql in kill session scenario, sqlTuple={}",
jdbcGeneralResult.getSqlTuple(), e);
}
}
finalResults.add(jdbcGeneralResult);
}
return finalResults;
}

/**
* Check whether client directed oceanbase database server. If an exception occurs or the version
* does not support, return null.
*
* @param connectionSession
* @return
*/
private Boolean isObServerDirected(ConnectionSession connectionSession) {
String sql = (connectionSession.getDialectType() == OB_MYSQL)
? "select PROXY_SESSID from oceanbase.gv$ob_processlist where ID =(select connection_id());"
: "select PROXY_SESSID from gv$ob_processlist where ID =(select sys_context('userenv','sid') from dual);";
try {
List<String> proxySessids = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.query(sql, (rs, rowNum) -> rs.getString("PROXY_SESSID"));
if (proxySessids != null && proxySessids.size() == 1) {
return proxySessids.get(0) == null;
}
} catch (Exception e) {
log.warn("Failed to obtain the PROXY_SESSID: {}", e.getMessage());
}
// Return null if the version is not supported or an exception occurs
return null;
}

/**
* Get the OBProxy version number. If an exception occurs or the version does not support, return
* null.
*
* @param connectionSession
* @param isDirectedOBServer
* @return
*/
private String getObProxyVersion(ConnectionSession connectionSession, Boolean isDirectedOBServer) {
if (Boolean.TRUE.equals(isDirectedOBServer)) {
return null;
}
try {
return connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.queryForObject("select proxy_version()", String.class);
} catch (Exception e) {
log.warn("Failed to obtain the OBProxy version number: {}", e.getMessage());
return null;
}
}

private boolean isGlobalClientSessionEnabled(ConnectionSession connectionSession, String obProxyVersion,
String obVersion) {
// verification version requirement
if (StringUtils.isBlank(obProxyVersion)
|| StringUtils.isBlank(obVersion)
|| VersionUtils.isLessThan(obProxyVersion, GLOBAL_CLIENT_SESSION_OB_PROXY_VERSION_NUMBER)
|| VersionUtils.isLessThan(obVersion, GLOBAL_CLIENT_SESSION_OB_VERSION_NUMBER)) {
return false;
}
try {
Integer proxyId = getOBProxyConfig(connectionSession, "proxy_id");
Integer clientSessionIdVersion = getOBProxyConfig(connectionSession, "client_session_id_version");

return proxyId != null
&& proxyId >= GLOBAL_CLIENT_SESSION_PROXY_ID_MIN
&& proxyId <= GLOBAL_CLIENT_SESSION_PROXY_ID_MAX
&& clientSessionIdVersion != null
&& clientSessionIdVersion == GLOBAL_CLIENT_SESSION_ID_VERSION;
} catch (Exception e) {
log.warn("Failed to determine if global client session is enabled: {}", e.getMessage());
return false;
}
}

/**
* Gets the value of OBProxy's configuration variable If an exception occurs or the version does not
* support, return null.
*
* @param connectionSession
* @param configName
* @return
*/
private Integer getOBProxyConfig(ConnectionSession connectionSession, String configName) {
try {
return connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.query("show proxyconfig like '" + configName + "';",
rs -> rs.next() ? rs.getInt("value") : null);
} catch (Exception e) {
log.warn("Failed to obtain the value of OBProxy's configuration variable: {}", e.getMessage());
return null;
}
}

private boolean isUnknownThreadIdError(Exception e) {
return StringUtils.containsIgnoreCase(e.getMessage(), "Unknown thread id");
}

private JdbcGeneralResult handleUnknownThreadIdError(ConnectionSession connectionSession,
JdbcGeneralResult jdbcGeneralResult, Boolean isDirectedOBServer,
boolean isEnabledGlobalClientSession, boolean isSupportedOracleModeKillSession) {
if (Boolean.TRUE.equals(isDirectedOBServer)) {
log.info("The current connection mode is directing observer, return error result directly");
return jdbcGeneralResult;
}
if (isEnabledGlobalClientSession) {
log.info("The OBProxy has enabled the global client session, return error result directly");
return jdbcGeneralResult;
}
if (isSupportedOracleModeKillSession) {
return tryKillSessionByAnonymousBlock(connectionSession, jdbcGeneralResult,
jdbcGeneralResult.getSqlTuple());
}
return tryKillSessionViaDirectConnectObServer(connectionSession, jdbcGeneralResult,
jdbcGeneralResult.getSqlTuple());
}

private boolean isOracleModeKillSessionSupported(String obVersion, ConnectionSession connectionSession) {
return StringUtils.isNotBlank(obVersion) &&
VersionUtils.isGreaterThanOrEqualsTo(obVersion, ORACLE_MODEL_KILL_SESSION_WITH_BLOCK_OB_VERSION_NUMBER)
&&
connectionSession.getDialectType() == DialectType.OB_ORACLE;
}

/**
* Try to kill session by using anonymous code blocks. If successful, return a successful
* jdbcGeneralResult, otherwise return the original jdbcGeneralResult.
*
* @param connectionSession
* @param jdbcGeneralResult
* @param sqlTuple
* @return
*/
private JdbcGeneralResult tryKillSessionByAnonymousBlock(ConnectionSession connectionSession,
JdbcGeneralResult jdbcGeneralResult, SqlTuple sqlTuple) {
log.info("Kill query/session Unknown thread id error, try anonymous code blocks");
String executedSql = sqlTuple.getExecutedSql();
if (executedSql != null && executedSql.endsWith(";")) {
executedSql = executedSql.substring(0, executedSql.length() - 1);
}
String anonymousCodeBlock = "BEGIN\n"
+ "EXECUTE IMMEDIATE '"
+ executedSql
+ "';\n"
+ "END;";
try {
connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY)
.execute(anonymousCodeBlock);
return JdbcGeneralResult.successResult(sqlTuple);

} catch (Exception e) {
log.warn("Failed to kill session by using anonymous code blocks: {}", e.getMessage());
return jdbcGeneralResult;
}).collect(Collectors.toList());
}
}

/**
* Try to kill session by direct connect observer. If successful, return a successful
* jdbcGeneralResult, otherwise return the original jdbcGeneralResult.
*
* @param connectionSession
* @param jdbcGeneralResult
* @param sqlTuple
* @return
*/
private JdbcGeneralResult tryKillSessionViaDirectConnectObServer(ConnectionSession connectionSession,
JdbcGeneralResult jdbcGeneralResult, SqlTuple sqlTuple) {
try {
log.info("Kill query/session Unknown thread id error, try direct connect observer");
directLinkServerAndExecute(sqlTuple.getExecutedSql(), connectionSession);
return JdbcGeneralResult.successResult(sqlTuple);
} catch (Exception e) {
log.warn("Failed to direct connect observer {}", e.getMessage());
return jdbcGeneralResult;
}
}

private void directLinkServerAndExecute(String sql, ConnectionSession session)
Expand Down

0 comments on commit 29d49fa

Please sign in to comment.