diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java index b3407c7146..daac207cf9 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java @@ -209,7 +209,7 @@ public SuccessResponse killQuery(@PathVariable String sessionId) { * kill session * * @param req - * @return + * @return kill result */ @ApiOperation(value = "kill session", notes = "终止会话接口") @RequestMapping(value = "/sessions/killSession", method = RequestMethod.POST) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/session/DefaultDBSessionManage.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/session/DefaultDBSessionManage.java index 9a06d5dacb..f10569cfee 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/db/session/DefaultDBSessionManage.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/db/session/DefaultDBSessionManage.java @@ -15,8 +15,11 @@ */ package com.oceanbase.odc.service.db.session; +import static com.oceanbase.odc.core.shared.constant.DialectType.OB_MYSQL; + import java.sql.Connection; import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -40,12 +43,14 @@ import com.google.common.collect.Lists; import com.oceanbase.odc.common.util.StringUtils; +import com.oceanbase.odc.common.util.VersionUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.core.datasource.SingleConnectionDataSource; import com.oceanbase.odc.core.session.ConnectionSession; import com.oceanbase.odc.core.session.ConnectionSessionConstants; import com.oceanbase.odc.core.session.ConnectionSessionUtil; import com.oceanbase.odc.core.shared.Verify; +import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.shared.exception.InternalServerError; import com.oceanbase.odc.core.shared.model.OdcDBSession; import com.oceanbase.odc.core.sql.execute.model.JdbcGeneralResult; @@ -78,6 +83,13 @@ public class DefaultDBSessionManage implements DBSessionManageFacade { + "(?[0-9]{1,5})\\*/.*"; private static final Pattern SERVER_PATTERN = Pattern.compile(SERVER_REGEX); private static final ConnectionMapper CONNECTION_MAPPER = ConnectionMapper.INSTANCE; + private static final String GLOBAL_CLIENT_SESSION_OB_PROXY_VERSION_NUMBER = "4.2.3"; + private static final String GLOBAL_CLIENT_SESSION_OB_VERSION_NUMBER = "4.2.5"; + private static final String ORACLE_MODEL_KILL_SESSION_WITH_BLOCK_OB_VERSION_NUMBER = "4.2.1.0"; + private static final byte GLOBAL_CLIENT_SESSION_PROXY_ID_MIN = 0; + private static final short GLOBAL_CLIENT_SESSION_PROXY_ID_MAX = 8191; + private static final byte GLOBAL_CLIENT_SESSION_ID_VERSION = 2; + @Autowired private ConnectSessionService sessionService; @@ -198,6 +210,16 @@ private List doKillSessionOrQuery( @SkipAuthorize("odc internal usage") public List executeKillSession(ConnectionSession connectionSession, List sqlTuples, String sqlScript) { + List results = executeKillCommands(connectionSession, sqlTuples, sqlScript); + if (connectionSession.getDialectType() == DialectType.OB_MYSQL + || connectionSession.getDialectType() == DialectType.OB_ORACLE) { + return processResults(connectionSession, results); + } + return results; + } + + private List executeKillCommands(ConnectionSession connectionSession, List sqlTuples, + String sqlScript) { List results = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY) .execute(new OdcStatementCallBack(sqlTuples, connectionSession, true, null, false)); @@ -205,26 +227,213 @@ public List executeKillSession(ConnectionSession connectionSe log.warn("Execution of the kill session command failed with unknown error, sql={}", sqlScript); throw new InternalServerError("Unknown error"); } - return results.stream().map(jdbcGeneralResult -> { - SqlTuple sqlTuple = jdbcGeneralResult.getSqlTuple(); + return results; + } + + /** + * process the execution result after the first kill commands. if the result contains unknown thread + * id exception, try to use other solutions to execute the kill commands. + * + * @param connectionSession + * @param results + * @return + */ + private List processResults(ConnectionSession connectionSession, + List results) { + Boolean isDirectedOBServer = isObServerDirected(connectionSession); + String obProxyVersion = getObProxyVersion(connectionSession, isDirectedOBServer); + String obVersion = ConnectionSessionUtil.getVersion(connectionSession); + boolean isEnabledGlobalClientSession = + isGlobalClientSessionEnabled(connectionSession, obProxyVersion, obVersion); + boolean isSupportedOracleModeKillSession = isOracleModeKillSessionSupported(obVersion, connectionSession); + List finalResults = new ArrayList<>(); + for (JdbcGeneralResult jdbcGeneralResult : results) { try { jdbcGeneralResult.getQueryResult(); } catch (Exception e) { - if (StringUtils.contains(e.getMessage(), "Unknown thread id")) { - try { - log.info("Kill query/session Unknown thread id error, try direct connect observer"); - directLinkServerAndExecute(sqlTuple.getExecutedSql(), - connectionSession); - return JdbcGeneralResult.successResult(sqlTuple); - } catch (Exception e1) { - log.warn("Failed to direct connect observer {}", e1.getMessage()); - } + if (isUnknownThreadIdError(e)) { + jdbcGeneralResult = handleUnknownThreadIdError(connectionSession, + jdbcGeneralResult, isDirectedOBServer, + isEnabledGlobalClientSession, isSupportedOracleModeKillSession); } else { - log.warn("Failed to execute sql in kill session scenario, sqlTuple={}", sqlTuple, e); + log.warn("Failed to execute sql in kill session scenario, sqlTuple={}", + jdbcGeneralResult.getSqlTuple(), e); } } + finalResults.add(jdbcGeneralResult); + } + return finalResults; + } + + /** + * Check whether client directed oceanbase database server. If an exception occurs or the version + * does not support, return null. + * + * @param connectionSession + * @return + */ + private Boolean isObServerDirected(ConnectionSession connectionSession) { + String sql = (connectionSession.getDialectType() == OB_MYSQL) + ? "select PROXY_SESSID from oceanbase.gv$ob_processlist where ID =(select connection_id());" + : "select PROXY_SESSID from gv$ob_processlist where ID =(select sys_context('userenv','sid') from dual);"; + try { + List proxySessids = connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY) + .query(sql, (rs, rowNum) -> rs.getString("PROXY_SESSID")); + if (proxySessids != null && proxySessids.size() == 1) { + return proxySessids.get(0) == null; + } + } catch (Exception e) { + log.warn("Failed to obtain the PROXY_SESSID: {}", e.getMessage()); + } + // Return null if the version is not supported or an exception occurs + return null; + } + + /** + * Get the OBProxy version number. If an exception occurs or the version does not support, return + * null. + * + * @param connectionSession + * @param isDirectedOBServer + * @return + */ + private String getObProxyVersion(ConnectionSession connectionSession, Boolean isDirectedOBServer) { + if (Boolean.TRUE.equals(isDirectedOBServer)) { + return null; + } + try { + return connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY) + .queryForObject("select proxy_version()", String.class); + } catch (Exception e) { + log.warn("Failed to obtain the OBProxy version number: {}", e.getMessage()); + return null; + } + } + + private boolean isGlobalClientSessionEnabled(ConnectionSession connectionSession, String obProxyVersion, + String obVersion) { + // verification version requirement + if (StringUtils.isBlank(obProxyVersion) + || StringUtils.isBlank(obVersion) + || VersionUtils.isLessThan(obProxyVersion, GLOBAL_CLIENT_SESSION_OB_PROXY_VERSION_NUMBER) + || VersionUtils.isLessThan(obVersion, GLOBAL_CLIENT_SESSION_OB_VERSION_NUMBER)) { + return false; + } + try { + Integer proxyId = getOBProxyConfig(connectionSession, "proxy_id"); + Integer clientSessionIdVersion = getOBProxyConfig(connectionSession, "client_session_id_version"); + + return proxyId != null + && proxyId >= GLOBAL_CLIENT_SESSION_PROXY_ID_MIN + && proxyId <= GLOBAL_CLIENT_SESSION_PROXY_ID_MAX + && clientSessionIdVersion != null + && clientSessionIdVersion == GLOBAL_CLIENT_SESSION_ID_VERSION; + } catch (Exception e) { + log.warn("Failed to determine if global client session is enabled: {}", e.getMessage()); + return false; + } + } + + /** + * Gets the value of OBProxy's configuration variable If an exception occurs or the version does not + * support, return null. + * + * @param connectionSession + * @param configName + * @return + */ + private Integer getOBProxyConfig(ConnectionSession connectionSession, String configName) { + try { + return connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY) + .query("show proxyconfig like '" + configName + "';", + rs -> rs.next() ? rs.getInt("value") : null); + } catch (Exception e) { + log.warn("Failed to obtain the value of OBProxy's configuration variable: {}", e.getMessage()); + return null; + } + } + + private boolean isUnknownThreadIdError(Exception e) { + return StringUtils.containsIgnoreCase(e.getMessage(), "Unknown thread id"); + } + + private JdbcGeneralResult handleUnknownThreadIdError(ConnectionSession connectionSession, + JdbcGeneralResult jdbcGeneralResult, Boolean isDirectedOBServer, + boolean isEnabledGlobalClientSession, boolean isSupportedOracleModeKillSession) { + if (Boolean.TRUE.equals(isDirectedOBServer)) { + log.info("The current connection mode is directing observer, return error result directly"); + return jdbcGeneralResult; + } + if (isEnabledGlobalClientSession) { + log.info("The OBProxy has enabled the global client session, return error result directly"); + return jdbcGeneralResult; + } + if (isSupportedOracleModeKillSession) { + return tryKillSessionByAnonymousBlock(connectionSession, jdbcGeneralResult, + jdbcGeneralResult.getSqlTuple()); + } + return tryKillSessionViaDirectConnectObServer(connectionSession, jdbcGeneralResult, + jdbcGeneralResult.getSqlTuple()); + } + + private boolean isOracleModeKillSessionSupported(String obVersion, ConnectionSession connectionSession) { + return StringUtils.isNotBlank(obVersion) && + VersionUtils.isGreaterThanOrEqualsTo(obVersion, ORACLE_MODEL_KILL_SESSION_WITH_BLOCK_OB_VERSION_NUMBER) + && + connectionSession.getDialectType() == DialectType.OB_ORACLE; + } + + /** + * Try to kill session by using anonymous code blocks. If successful, return a successful + * jdbcGeneralResult, otherwise return the original jdbcGeneralResult. + * + * @param connectionSession + * @param jdbcGeneralResult + * @param sqlTuple + * @return + */ + private JdbcGeneralResult tryKillSessionByAnonymousBlock(ConnectionSession connectionSession, + JdbcGeneralResult jdbcGeneralResult, SqlTuple sqlTuple) { + log.info("Kill query/session Unknown thread id error, try anonymous code blocks"); + String executedSql = sqlTuple.getExecutedSql(); + if (executedSql != null && executedSql.endsWith(";")) { + executedSql = executedSql.substring(0, executedSql.length() - 1); + } + String anonymousCodeBlock = "BEGIN\n" + + "EXECUTE IMMEDIATE '" + + executedSql + + "';\n" + + "END;"; + try { + connectionSession.getSyncJdbcExecutor(ConnectionSessionConstants.BACKEND_DS_KEY) + .execute(anonymousCodeBlock); + return JdbcGeneralResult.successResult(sqlTuple); + + } catch (Exception e) { + log.warn("Failed to kill session by using anonymous code blocks: {}", e.getMessage()); return jdbcGeneralResult; - }).collect(Collectors.toList()); + } + } + + /** + * Try to kill session by direct connect observer. If successful, return a successful + * jdbcGeneralResult, otherwise return the original jdbcGeneralResult. + * + * @param connectionSession + * @param jdbcGeneralResult + * @param sqlTuple + * @return + */ + private JdbcGeneralResult tryKillSessionViaDirectConnectObServer(ConnectionSession connectionSession, + JdbcGeneralResult jdbcGeneralResult, SqlTuple sqlTuple) { + try { + log.info("Kill query/session Unknown thread id error, try direct connect observer"); + directLinkServerAndExecute(sqlTuple.getExecutedSql(), connectionSession); + return JdbcGeneralResult.successResult(sqlTuple); + } catch (Exception e) { + log.warn("Failed to direct connect observer {}", e.getMessage()); + return jdbcGeneralResult; + } } private void directLinkServerAndExecute(String sql, ConnectionSession session)