Skip to content

Commit

Permalink
fix[zk]: zooKeeper reconnects without a registered consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysunxiao committed Jan 30, 2024
1 parent 6dbb718 commit 36f90c8
Showing 1 changed file with 79 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ public void stateChanged(CuratorFramework client, ConnectionState state) {
case RECONNECTED:
// 检查3个持久化节点,不存在就创建
createZookeeperRootPath();
// 如果自己是服务提供者,则注册自己
// 如果自己是消费者,则创建连接到所有的自己关心的服务提供者
initZookeeper();
break;

default:
Expand All @@ -223,73 +220,80 @@ public void stateChanged(CuratorFramework client, ConnectionState state) {
* 检查 /zfoo /zfoo/provider /zfoo/consumer 这3个“持久化”节点,不存在就创建
*/
private void createZookeeperRootPath() {
try {
// /zfoo
// 创建zookeeper的根路径
var rootStat = curator.checkExists().forPath(ROOT_PATH);
// 根节点不存在
if (Objects.isNull(rootStat)) {
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
var builder = curator.create();
builder.creatingParentsIfNeeded();
// 检查zk连接授权
if (registryConfig.hasZookeeperAuthor()) {
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
builder.withACL(aclList);
}
// 根节点是持久化节点
builder.withMode(CreateMode.PERSISTENT);
// 真正创建根节点
builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter()));
} else {
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
// 读取根节点上的数据
var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH);
// 把根节点数据从二进制转string字符串
var rootPathData = StringUtils.bytesToString(bytes);

// 检查zookeeper根节点的内容
if (!rootPathData.equals(registryConfig.getCenter())) {
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter()));
}

// 检查zookeeper根节点的权限
if (registryConfig.hasZookeeperAuthor()) {
try {
var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH);
AssertionUtils.notEmpty(providerRootPathAclList);
AssertionUtils.isTrue(providerRootPathAclList.size() == 1);
executor.execute(() -> {
try {
// /zfoo
// 创建zookeeper的根路径
var rootStat = curator.checkExists().forPath(ROOT_PATH);
// 根节点不存在
if (Objects.isNull(rootStat)) {
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
var builder = curator.create();
builder.creatingParentsIfNeeded();
// 检查zk连接授权
if (registryConfig.hasZookeeperAuthor()) {
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0)));
} catch (Exception e) {
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e)));
builder.withACL(aclList);
}
// 根节点是持久化节点
builder.withMode(CreateMode.PERSISTENT);
// 真正创建根节点
builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter()));
} else {
var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry();
// 读取根节点上的数据
var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH);
// 把根节点数据从二进制转string字符串
var rootPathData = StringUtils.bytesToString(bytes);

// 检查zookeeper根节点的内容
if (!rootPathData.equals(registryConfig.getCenter())) {
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter()));
}

// 检查zookeeper根节点的权限
if (registryConfig.hasZookeeperAuthor()) {
try {
var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH);
AssertionUtils.notEmpty(providerRootPathAclList);
AssertionUtils.isTrue(providerRootPathAclList.size() == 1);
var zookeeperAuthorStr = registryConfig.toZookeeperAuthor();
var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr))));
AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0)));
} catch (Exception e) {
throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e)));
}
}

}

}
// /zfoo/provider
// 检查服务提供者节点,不存在则创建
var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH);
if (Objects.isNull(providerStat)) {
curator.create()
.withMode(CreateMode.PERSISTENT)
.forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
}

// /zfoo/provider
// 检查服务提供者节点,不存在则创建
var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH);
if (Objects.isNull(providerStat)) {
curator.create()
.withMode(CreateMode.PERSISTENT)
.forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
}
// /zfoo/consumer
// 检查消费者节点,不存在则创建
var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH);
if (Objects.isNull(consumerStat)) {
curator.create()
.withMode(CreateMode.PERSISTENT)
.forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
}

// /zfoo/consumer
// 检查消费者节点,不存在则创建
var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH);
if (Objects.isNull(consumerStat)) {
curator.create()
.withMode(CreateMode.PERSISTENT)
.forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY);
// 如果自己是服务提供者,则注册自己
// 如果自己是消费者,则创建连接到所有的自己关心的服务提供者
initZookeeper();
} catch (Throwable t) {
logger.error("Zookeeper failed to create zookeeper root path, wait [{}] seconds to recreate", RETRY_SECONDS, t);
SchedulerBus.schedule(() -> createZookeeperRootPath(), RETRY_SECONDS, TimeUnit.SECONDS);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

private void startProviderCache() {
Expand Down Expand Up @@ -352,9 +356,8 @@ private void initZookeeper() {

// 自己是消费者,则连接所有自己关心的服务提供者
initConsumerCache();
} catch (Exception e) {
//
logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, e);
} catch (Throwable t) {
logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, t);
SchedulerBus.schedule(() -> initZookeeper(), RETRY_SECONDS, TimeUnit.SECONDS);
}
});
Expand Down Expand Up @@ -490,23 +493,21 @@ public void accept(Session session) {
session.setConsumerRegister(providerCache);
logger.info("Consumer starts consuming the provider:[{}]", providerCache);
EventBus.post(ConsumerStartEvent.valueOf(providerCache, session));
// 将自己的消费者消息写到 /consumer 的临时节点下
updateConsumerData();
} catch (Throwable t) {
logger.error("[consumer:{}] failed to start, wait [{}] seconds to recheck consumer", providerCache, RETRY_SECONDS, t);
recheckFlag = true;
}
}

// 将自己的消费者消息写到 /consumer 的临时节点下
updateConsumerData();

if (recheckFlag) {
SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS);
}
}

private void updateConsumerData() {
// 将自己的消费者消息写到 /consumer 的临时节点下
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister();
var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString();
var list = new ArrayList<String>();
NetContext.getSessionManager().forEachClientSession(session -> {
var consumerAttribute = session.getConsumerRegister();
Expand All @@ -519,6 +520,14 @@ private void updateConsumerData() {
}
list.add(consumerAttribute.toProviderSimple());
});

if (CollectionUtils.isEmpty(list)) {
return;
}

// 将自己的消费者消息写到 /consumer 的临时节点下
var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister();
var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString();
addData(path, StringUtils.bytes(JsonUtils.object2String(list)), CreateMode.EPHEMERAL);
}

Expand Down

0 comments on commit 36f90c8

Please sign in to comment.