From 36f90c8a69d246660c61ffbeec5072eb895b09de Mon Sep 17 00:00:00 2001 From: godotg Date: Tue, 30 Jan 2024 12:49:34 +0800 Subject: [PATCH] fix[zk]: zooKeeper reconnects without a registered consumer --- .../consumer/registry/ZookeeperRegistry.java | 149 ++++++++++-------- 1 file changed, 79 insertions(+), 70 deletions(-) diff --git a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java index 4b01aa4a9..31b93d08b 100644 --- a/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java +++ b/net/src/main/java/com/zfoo/net/consumer/registry/ZookeeperRegistry.java @@ -200,9 +200,6 @@ public void stateChanged(CuratorFramework client, ConnectionState state) { case RECONNECTED: // 检查3个持久化节点,不存在就创建 createZookeeperRootPath(); - // 如果自己是服务提供者,则注册自己 - // 如果自己是消费者,则创建连接到所有的自己关心的服务提供者 - initZookeeper(); break; default: @@ -223,73 +220,80 @@ public void stateChanged(CuratorFramework client, ConnectionState state) { * 检查 /zfoo /zfoo/provider /zfoo/consumer 这3个“持久化”节点,不存在就创建 */ private void createZookeeperRootPath() { - try { - // /zfoo - // 创建zookeeper的根路径 - var rootStat = curator.checkExists().forPath(ROOT_PATH); - // 根节点不存在 - if (Objects.isNull(rootStat)) { - var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry(); - var builder = curator.create(); - builder.creatingParentsIfNeeded(); - // 检查zk连接授权 - if (registryConfig.hasZookeeperAuthor()) { - var zookeeperAuthorStr = registryConfig.toZookeeperAuthor(); - var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr)))); - builder.withACL(aclList); - } - // 根节点是持久化节点 - builder.withMode(CreateMode.PERSISTENT); - // 真正创建根节点 - builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter())); - } else { - var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry(); - // 读取根节点上的数据 - var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH); - // 把根节点数据从二进制转string字符串 - var rootPathData = StringUtils.bytesToString(bytes); - - // 检查zookeeper根节点的内容 - if (!rootPathData.equals(registryConfig.getCenter())) { - throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter())); - } - - // 检查zookeeper根节点的权限 - if (registryConfig.hasZookeeperAuthor()) { - try { - var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH); - AssertionUtils.notEmpty(providerRootPathAclList); - AssertionUtils.isTrue(providerRootPathAclList.size() == 1); + executor.execute(() -> { + try { + // /zfoo + // 创建zookeeper的根路径 + var rootStat = curator.checkExists().forPath(ROOT_PATH); + // 根节点不存在 + if (Objects.isNull(rootStat)) { + var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry(); + var builder = curator.create(); + builder.creatingParentsIfNeeded(); + // 检查zk连接授权 + if (registryConfig.hasZookeeperAuthor()) { var zookeeperAuthorStr = registryConfig.toZookeeperAuthor(); var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr)))); - AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0))); - } catch (Exception e) { - throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e))); + builder.withACL(aclList); + } + // 根节点是持久化节点 + builder.withMode(CreateMode.PERSISTENT); + // 真正创建根节点 + builder.forPath(ROOT_PATH, StringUtils.bytes(registryConfig.getCenter())); + } else { + var registryConfig = NetContext.getConfigManager().getLocalConfig().getRegistry(); + // 读取根节点上的数据 + var bytes = curator.getData().storingStatIn(new Stat()).forPath(ROOT_PATH); + // 把根节点数据从二进制转string字符串 + var rootPathData = StringUtils.bytesToString(bytes); + + // 检查zookeeper根节点的内容 + if (!rootPathData.equals(registryConfig.getCenter())) { + throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] misconfigured [{}],expected [{}], check the relevant nodes and restart", ROOT_PATH, rootPathData, registryConfig.getCenter())); } + + // 检查zookeeper根节点的权限 + if (registryConfig.hasZookeeperAuthor()) { + try { + var providerRootPathAclList = curator.getACL().forPath(ROOT_PATH); + AssertionUtils.notEmpty(providerRootPathAclList); + AssertionUtils.isTrue(providerRootPathAclList.size() == 1); + var zookeeperAuthorStr = registryConfig.toZookeeperAuthor(); + var aclList = List.of(new ACL(ZooDefs.Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest(zookeeperAuthorStr)))); + AssertionUtils.isTrue(providerRootPathAclList.get(0).equals(aclList.get(0))); + } catch (Exception e) { + throw new RuntimeException(StringUtils.format("zookeeper rootPath[{}] permissions are misconfigured [{}]", ROOT_PATH, ExceptionUtils.getMessage(e))); + } + } + } - } + // /zfoo/provider + // 检查服务提供者节点,不存在则创建 + var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH); + if (Objects.isNull(providerStat)) { + curator.create() + .withMode(CreateMode.PERSISTENT) + .forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY); + } - // /zfoo/provider - // 检查服务提供者节点,不存在则创建 - var providerStat = curator.checkExists().forPath(PROVIDER_ROOT_PATH); - if (Objects.isNull(providerStat)) { - curator.create() - .withMode(CreateMode.PERSISTENT) - .forPath(PROVIDER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY); - } + // /zfoo/consumer + // 检查消费者节点,不存在则创建 + var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH); + if (Objects.isNull(consumerStat)) { + curator.create() + .withMode(CreateMode.PERSISTENT) + .forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY); + } - // /zfoo/consumer - // 检查消费者节点,不存在则创建 - var consumerStat = curator.checkExists().forPath(CONSUMER_ROOT_PATH); - if (Objects.isNull(consumerStat)) { - curator.create() - .withMode(CreateMode.PERSISTENT) - .forPath(CONSUMER_ROOT_PATH, ArrayUtils.EMPTY_BYTE_ARRAY); + // 如果自己是服务提供者,则注册自己 + // 如果自己是消费者,则创建连接到所有的自己关心的服务提供者 + initZookeeper(); + } catch (Throwable t) { + logger.error("Zookeeper failed to create zookeeper root path, wait [{}] seconds to recreate", RETRY_SECONDS, t); + SchedulerBus.schedule(() -> createZookeeperRootPath(), RETRY_SECONDS, TimeUnit.SECONDS); } - } catch (Exception e) { - throw new RuntimeException(e); - } + }); } private void startProviderCache() { @@ -352,9 +356,8 @@ private void initZookeeper() { // 自己是消费者,则连接所有自己关心的服务提供者 initConsumerCache(); - } catch (Exception e) { - // - logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, e); + } catch (Throwable t) { + logger.error("Zookeeper failed to initialize, wait [{}] seconds to reinitialize", RETRY_SECONDS, t); SchedulerBus.schedule(() -> initZookeeper(), RETRY_SECONDS, TimeUnit.SECONDS); } }); @@ -490,23 +493,21 @@ public void accept(Session session) { session.setConsumerRegister(providerCache); logger.info("Consumer starts consuming the provider:[{}]", providerCache); EventBus.post(ConsumerStartEvent.valueOf(providerCache, session)); - // 将自己的消费者消息写到 /consumer 的临时节点下 - updateConsumerData(); } catch (Throwable t) { logger.error("[consumer:{}] failed to start, wait [{}] seconds to recheck consumer", providerCache, RETRY_SECONDS, t); recheckFlag = true; } } + // 将自己的消费者消息写到 /consumer 的临时节点下 + updateConsumerData(); + if (recheckFlag) { SchedulerBus.schedule(() -> checkConsumer(), RETRY_SECONDS, TimeUnit.SECONDS); } } private void updateConsumerData() { - // 将自己的消费者消息写到 /consumer 的临时节点下 - var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister(); - var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString(); var list = new ArrayList(); NetContext.getSessionManager().forEachClientSession(session -> { var consumerAttribute = session.getConsumerRegister(); @@ -519,6 +520,14 @@ private void updateConsumerData() { } list.add(consumerAttribute.toProviderSimple()); }); + + if (CollectionUtils.isEmpty(list)) { + return; + } + + // 将自己的消费者消息写到 /consumer 的临时节点下 + var localRegisterVO = NetContext.getConfigManager().getLocalConfig().toLocalRegister(); + var path = CONSUMER_ROOT_PATH + StringUtils.SLASH + localRegisterVO.toConsumerString(); addData(path, StringUtils.bytes(JsonUtils.object2String(list)), CreateMode.EPHEMERAL); }