From da856a4619098c3cce86f2d0330826f20935c2ec Mon Sep 17 00:00:00 2001 From: doleyzi <43397300+doleyzi@users.noreply.github.com> Date: Mon, 8 Apr 2024 14:06:04 +0800 Subject: [PATCH] [INLONG-9928][Audit] Audit-service HA election through mysql (#9929) --- .../src/main/java/config/ConfigConstants.java | 13 + .../src/main/java/config/SqlConstants.java | 37 +++ .../elector/ElectorChangeListenerImpl.java | 34 ++ .../src/main/java/elector/api/Selector.java | 2 +- .../java/elector/api/SelectorFactory.java | 30 ++ .../main/java/elector/impl/DBDataSource.java | 298 ++++++++++++++++++ .../main/java/elector/impl/SelectorImpl.java | 210 ++++++++++++ 7 files changed, 623 insertions(+), 1 deletion(-) create mode 100644 inlong-audit/audit-service/src/main/java/config/SqlConstants.java create mode 100644 inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java create mode 100644 inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java create mode 100644 inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java create mode 100644 inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java diff --git a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java index a3bd9b29f96..42ef2d70465 100644 --- a/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java +++ b/inlong-audit/audit-service/src/main/java/config/ConfigConstants.java @@ -101,4 +101,17 @@ public class ConfigConstants { public static final String KEY_DAILY_SUMMARY_BEFORE_TIMES = "daily.summary.before.times"; public static final int DEFAULT_DAILY_SUMMARY_BEFORE_TIMES = 2; + // HA selector config + public static final String KEY_RELEASE_LEADER_INTERVAL = "release.leader.interval"; + public static final int DEFAULT_RELEASE_LEADER_INTERVAL = 40; + public static final String KEY_SELECTOR_THREAD_POOL_SIZE = "selector.thread.pool.size"; + public static final int DEFAULT_SELECTOR_THREAD_POOL_SIZE = 3; + + // HikariConfig + public static final String CACHE_PREP_STMTS = "cachePrepStmts"; + public static final String PREP_STMT_CACHE_SIZE = "prepStmtCacheSize"; + public static final String PREP_STMT_CACHE_SQL_LIMIT = "prepStmtCacheSqlLimit"; + + public static final int MAX_INIT_COUNT = 2; + public static final int RANDOM_BOUND = 10; } diff --git a/inlong-audit/audit-service/src/main/java/config/SqlConstants.java b/inlong-audit/audit-service/src/main/java/config/SqlConstants.java new file mode 100644 index 00000000000..a7164f2bdf8 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/config/SqlConstants.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config; + +/** + * Sql constants + */ +public class SqlConstants { + + // HA selector sql + public static final String SELECTOR_SQL = + "insert ignore into {0} (service_id, leader_id, last_seen_active) values (''{1}'', ''{2}'', now()) on duplicate key update leader_id = if(last_seen_active < now() - interval # second, values(leader_id), leader_id),last_seen_active = if(leader_id = values(leader_id), values(last_seen_active), last_seen_active)"; + public static final String REPLACE_LEADER_SQL = + "replace into {0} ( service_id, leader_id, last_seen_active ) values (''{1}'', ''#'', now())"; + public static final String RELEASE_SQL = "delete from {0} where service_id=''{1}'' and leader_id= ''{2}''"; + public static final String IS_LEADER_SQL = + "select count(*) as is_leader from {0} where service_id=''{1}'' and leader_id=''{2}''"; + public static final String SEARCH_CURRENT_LEADER_SQL = + "select leader_id as leader from {0} where service_id=''{1}''"; + public static final String SELECT_TEST_SQL = "SELECT 1 "; + +} diff --git a/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java b/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java new file mode 100644 index 00000000000..9017caacff4 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/elector/ElectorChangeListenerImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package elector; + +import elector.api.SelectorChangeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Elector change listener impl + */ +public class ElectorChangeListenerImpl implements SelectorChangeListener { + + private static final Logger logger = LoggerFactory.getLogger(ElectorChangeListenerImpl.class); + + public void leaderChanged(boolean currentNodeIsLeader) { + logger.info("LeaderChanged {}:", currentNodeIsLeader); + } +} diff --git a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java b/inlong-audit/audit-service/src/main/java/elector/api/Selector.java index d4aa342dea3..5e9e5647622 100644 --- a/inlong-audit/audit-service/src/main/java/elector/api/Selector.java +++ b/inlong-audit/audit-service/src/main/java/elector/api/Selector.java @@ -38,5 +38,5 @@ public abstract class Selector { public abstract boolean rebuildSelectorDBSource(); - public abstract boolean close(); + public abstract void close(); } diff --git a/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java b/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java new file mode 100644 index 00000000000..42ed31a7241 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/elector/api/SelectorFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package elector.api; + +import elector.impl.SelectorImpl; + +/** + * Selector factory + */ +public class SelectorFactory { + + public static Selector getNewElector(SelectorConfig electorConfig) { + return new SelectorImpl(electorConfig); + } +} \ No newline at end of file diff --git a/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java b/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java new file mode 100644 index 00000000000..91eef3156f6 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/elector/impl/DBDataSource.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package elector.impl; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import elector.api.SelectorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.text.MessageFormat; +import java.util.concurrent.atomic.AtomicInteger; + +import static config.ConfigConstants.CACHE_PREP_STMTS; +import static config.ConfigConstants.MAX_INIT_COUNT; +import static config.ConfigConstants.PREP_STMT_CACHE_SIZE; +import static config.ConfigConstants.PREP_STMT_CACHE_SQL_LIMIT; +import static config.SqlConstants.IS_LEADER_SQL; +import static config.SqlConstants.RELEASE_SQL; +import static config.SqlConstants.REPLACE_LEADER_SQL; +import static config.SqlConstants.SEARCH_CURRENT_LEADER_SQL; +import static config.SqlConstants.SELECTOR_SQL; +import static config.SqlConstants.SELECT_TEST_SQL; + +/** + * DB data source + */ +public class DBDataSource { + + private static final Logger logger = LoggerFactory.getLogger(DBDataSource.class); + private String selectorSql = SELECTOR_SQL; + private String replaceLeaderSql = REPLACE_LEADER_SQL; + private String reLeaseSql = RELEASE_SQL; + private String isLeaderSql = IS_LEADER_SQL; + private String searchCurrentLeaderSql = SEARCH_CURRENT_LEADER_SQL; + private final SelectorConfig selectorConfig; + private HikariDataSource datasource; + public AtomicInteger getConnectionFailTimes; + + public DBDataSource(SelectorConfig selectorConfig) { + this.selectorConfig = selectorConfig; + this.getConnectionFailTimes = new AtomicInteger(0); + } + + /** + * init + * + * @param needFormatSql + * @throws Exception + */ + public void init(boolean needFormatSql) throws Exception { + try { + if (!selectorConfig.isUseDefaultLeader()) { + initDataSource(); + if (needFormatSql) { + formatSql(selectorConfig.getElectorDbName(), selectorConfig.getServiceId(), + selectorConfig.getLeaderId()); + } + } + } catch (Exception exception) { + logger.error(exception.getMessage()); + throw exception; + } + } + + /** + * init data source + * + * @throws Exception + */ + public void initDataSource() throws Exception { + boolean initSucc = false; + int initCount = 0; + + while (!initSucc && initCount < MAX_INIT_COUNT) { + try { + ++initCount; + if (datasource == null || datasource.isClosed()) { + HikariConfig config = new HikariConfig(); + config.setDriverClassName(selectorConfig.getDbDriver()); + logger.info("Init dataSource:{}", selectorConfig.getDbUrl()); + config.setJdbcUrl(selectorConfig.getDbUrl()); + config.setUsername(selectorConfig.getDbUser()); + config.setPassword(selectorConfig.getDbPasswd()); + config.setMaximumPoolSize(selectorConfig.getMaximumPoolSize()); + config.setAutoCommit(true); + config.setConnectionTimeout((long) selectorConfig.getConnectionTimeout()); + config.setMaxLifetime((long) selectorConfig.getMaxLifetime()); + config.addDataSourceProperty(CACHE_PREP_STMTS, selectorConfig.getCachePrepStmts()); + config.addDataSourceProperty(PREP_STMT_CACHE_SIZE, selectorConfig.getPrepStmtCacheSize()); + config.addDataSourceProperty(PREP_STMT_CACHE_SQL_LIMIT, + selectorConfig.getPrepStmtCacheSqlLimit()); + config.setConnectionTestQuery(SELECT_TEST_SQL); + datasource = new HikariDataSource(config); + } + + initSucc = true; + } catch (Exception exception) { + logger.error("DB url:{},user name:{},password:{},exception:{}", + selectorConfig.getDbUrl(), + selectorConfig.getDbUser(), + selectorConfig.getDbPasswd(), + exception.getMessage()); + } + } + + if (!initSucc) { + throw new Exception("## DBDataSource init Fail!"); + } + } + + /** + * close + */ + public void close() { + datasource.close(); + } + + /** + * Execute update + * + * @param sql + * @return + */ + private int executeUpdate(String sql) { + int result = 0; + try { + if ((null == datasource) || (datasource.isClosed())) { + initDataSource(); + } + + try (Connection connection = datasource.getConnection()) { + try (PreparedStatement pstmt = connection.prepareStatement(sql)) { + result = pstmt.executeUpdate(); + } catch (Exception executeUpdatEexception) { + logger.error("Exception :{}", executeUpdatEexception.getMessage()); + } + } catch (Exception pstmtEexception) { + logger.error("Exception :{}", pstmtEexception.getMessage()); + } + getConnectionFailTimes.set(0); + } catch (Exception exception) { + getConnectionFailTimes.addAndGet(1); + logger.warn("Get Connection fail. {}", exception.getMessage()); + } + return result; + } + + /** + * Leader selector + */ + public void leaderSelector() { + if (!selectorConfig.isUseDefaultLeader()) { + try { + int result = executeUpdate(selectorSql); + if (result == 2) { + logger.info("{} get the leader", selectorConfig.getLeaderId()); + } else if (result == 1) { + logger.info("{} do not get the leader", selectorConfig.getLeaderId()); + } + } catch (Exception exception) { + logger.error("Exception: {} ,sql:{}", exception.getMessage(), selectorSql); + } + + } + } + + /** + * Replace leader + * + * @param replaceLeaderId + */ + public void replaceLeader(String replaceLeaderId) { + replaceLeaderSql = replaceLeaderSql.replaceAll("#", replaceLeaderId); + + try { + int result = executeUpdate(replaceLeaderSql); + if (result > 0) { + logger.info("Replace leader success.sql:{}", replaceLeaderSql); + } else { + logger.warn("Replace leader failed. sql:" + replaceLeaderSql); + } + + } catch (Exception exception) { + logger.error("Exception :{} ", exception.getMessage()); + } + } + + /** + * Release leader + */ + public void releaseLeader() { + try { + int result = executeUpdate(reLeaseSql); + logger.info("ReleaseLeader sql:{}", reLeaseSql); + if (result == 1) { + logger.info("{} release the leader success", selectorConfig.getLeaderId()); + } + } catch (Exception exception) { + logger.error("ReLease sql:{},exception {}:,", reLeaseSql, exception.getMessage()); + } + + } + + /** + * Get current leader + * + * @return + */ + public String getCurrentLeader() { + if (selectorConfig.isUseDefaultLeader()) { + return selectorConfig.getDefaultLeaderId(); + } else { + String leaderId = ""; + + try { + if (null == datasource || datasource.isClosed()) { + logger.warn("DataSource is closed init is again"); + initDataSource(); + } + try (Connection connection = datasource.getConnection()) { + try (PreparedStatement pstmt = connection.prepareStatement(searchCurrentLeaderSql)) { + ResultSet resultSet = pstmt.executeQuery(); + if (resultSet.next()) { + leaderId = resultSet.getString("leader"); + } + } catch (Exception exception) { + logger.error("Exception {}", exception.getMessage()); + } + } catch (Throwable connectionException) { + logger.error("Exception {}", connectionException.getMessage()); + } + } catch (Exception datasourceException) { + logger.error("Exception {}", datasourceException.getMessage()); + } + + return leaderId; + } + } + + /** + * Judge DB data source whether to closed + * + * @return + */ + public boolean isDBDataSourceClosed() { + if (this.datasource != null) { + try { + Connection con = datasource.getConnection(); + if (con != null) { + con.close(); + } + return false; + } catch (Exception exception) { + logger.error("Exception {}", exception.getMessage()); + return true; + } + } + return true; + } + + /** + * Format sql + * + * @param params + */ + public void formatSql(String... params) { + selectorSql = MessageFormat.format(selectorSql, params); + selectorSql = selectorSql.replaceAll("#", selectorConfig.getLeaderTimeout() + ""); + logger.info(selectorSql); + replaceLeaderSql = MessageFormat.format(replaceLeaderSql, params); + logger.info(replaceLeaderSql); + reLeaseSql = MessageFormat.format(reLeaseSql, params); + logger.info("ReLeaseSql:{}", reLeaseSql); + isLeaderSql = MessageFormat.format(isLeaderSql, params); + logger.info(isLeaderSql); + searchCurrentLeaderSql = MessageFormat.format(searchCurrentLeaderSql, params); + logger.info(searchCurrentLeaderSql); + } +} diff --git a/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java b/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java new file mode 100644 index 00000000000..b84051fccb7 --- /dev/null +++ b/inlong-audit/audit-service/src/main/java/elector/impl/SelectorImpl.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package elector.impl; + +import config.Configuration; +import elector.api.Selector; +import elector.api.SelectorConfig; +import elector.task.DBMonitorTask; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static config.ConfigConstants.DEFAULT_RELEASE_LEADER_INTERVAL; +import static config.ConfigConstants.DEFAULT_SELECTOR_THREAD_POOL_SIZE; +import static config.ConfigConstants.KEY_RELEASE_LEADER_INTERVAL; +import static config.ConfigConstants.KEY_SELECTOR_THREAD_POOL_SIZE; +import static config.ConfigConstants.RANDOM_BOUND; + +/** + * Elector Impl + */ +public class SelectorImpl extends Selector { + + private static final Logger logger = LoggerFactory.getLogger(SelectorImpl.class); + private final SelectorConfig selectorConfig; + private final ExecutorService fixedThreadPool; + private boolean canElector = true; + private final DBDataSource dbDataSource; + private long sleepTime = 0L; + private boolean running = true; + + public SelectorImpl(SelectorConfig selectorConfig) { + this.selectorConfig = selectorConfig; + this.dbDataSource = new DBDataSource(selectorConfig); + fixedThreadPool = Executors.newFixedThreadPool(Configuration.getInstance().get( + KEY_SELECTOR_THREAD_POOL_SIZE, + DEFAULT_SELECTOR_THREAD_POOL_SIZE)); + } + + /** + * init + * + * @throws Exception + */ + public void init() throws Exception { + try { + logger.info("Init selector impl..."); + + dbDataSource.init(true); + + fixedThreadPool.execute(new ElectorWorkerThread()); + + fixedThreadPool.execute(new DBMonitorTask(selectorConfig, dbDataSource)); + } catch (Exception exception) { + logger.error("Failed to init selector", exception); + } + } + + /** + * Judge where is leader + * + * @return + */ + public boolean isLeader() { + return this.isLeader; + } + + /** + * Release leader + */ + public void releaseLeader() { + if (this.isLeader) + try { + dbDataSource.releaseLeader(); + } catch (Exception exception) { + logger.error("Exception :{}", exception.getMessage()); + } + + try { + TimeUnit.SECONDS.sleep(Configuration.getInstance().get(KEY_RELEASE_LEADER_INTERVAL, + DEFAULT_RELEASE_LEADER_INTERVAL)); + } catch (Exception exception) { + logger.error("Exception :{}", exception.getMessage()); + } + } + + /** + * Replace leader + * + * @param newLeaderId + */ + public void replaceLeader(String newLeaderId) { + sleepTime = (selectorConfig.getTryToBeLeaderInterval() * 2L); + dbDataSource.replaceLeader(newLeaderId); + } + + /** + * Get leader + * + * @param serviceId + * @return + */ + public String getLeader(String serviceId) { + return dbDataSource.getCurrentLeader(); + } + + /** + * Judge where can be elector + * + * @param canElector + */ + public void canSelect(boolean canElector) { + this.canElector = canElector; + } + + /** + * Rebuild elector DBSource + * + * @return + */ + public boolean rebuildSelectorDBSource() { + canSelect(false); + try { + releaseLeader(); + dbDataSource.close(); + dbDataSource.init(false); + canSelect(true); + } catch (Exception exception) { + logger.error("Exception :{}", exception.getMessage()); + return false; + } + return true; + } + + /** + * close + * + * @return + */ + public void close() { + running = false; + dbDataSource.close(); + fixedThreadPool.shutdown(); + } + + class ElectorWorkerThread implements Runnable { + + Random random; + + ElectorWorkerThread() { + this.random = new Random(); + } + + public void run() { + while (running) { + if (canElector) { + dbDataSource.leaderSelector(); + } + + String leaderId = dbDataSource.getCurrentLeader(); + if (StringUtils.isNotEmpty(leaderId)) { + if (selectorConfig.getLeaderId().equals(leaderId)) { + if (!isLeader + && selectorConfig.getSelectorChangeListener() != null) { + selectorConfig.getSelectorChangeListener().leaderChanged(true); + } + + isLeader = true; + sleepTime = selectorConfig.getTryToBeLeaderInterval(); + } else { + if (isLeader + && selectorConfig.getSelectorChangeListener() != null) { + selectorConfig.getSelectorChangeListener().leaderChanged(false); + } + + isLeader = false; + sleepTime = selectorConfig.getTryToBeLeaderInterval() + + random.nextInt(RANDOM_BOUND); + } + } + + try { + TimeUnit.SECONDS.sleep(sleepTime); + } catch (Exception exception) { + logger.error("Exception :{}", exception.getMessage()); + } + } + } + } +} \ No newline at end of file