From 049986a325c2bef106fcbc10ca8ef42979034cb4 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 28 Mar 2024 10:35:42 +0800 Subject: [PATCH] fix(interactive): Fix the occasionally incorrect pegasus server routing configuration (#3673) Fixes the `updatePeerView` may not be trigger at the proper time. --------- Co-authored-by: nengli.ln --- .../graphscope/groot/common/config/ZkConfig.java | 4 ++-- .../engine/pegasus/network/src/transport/block/mod.rs | 11 +++++++---- .../graphscope/groot/discovery/ZkDiscovery.java | 6 +++--- .../graphscope/groot/store/jna/JnaGraphStore.java | 7 +++++-- .../graphscope/groot/wal/kafka/KafkaLogService.java | 2 +- .../graphscope/groot/servers/ir/GaiaEngine.java | 11 +++++++++-- 6 files changed, 27 insertions(+), 14 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java index d87de555811b..cb5b398495a0 100755 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/ZkConfig.java @@ -29,10 +29,10 @@ public class ZkConfig { Config.intConfig("zk.session.timeout.ms", 30000); public static final Config ZK_BASE_SLEEP_MS = - Config.intConfig("zk.base.sleep.ms", 1000); + Config.intConfig("zk.base.sleep.ms", 10000); public static final Config ZK_MAX_SLEEP_MS = - Config.intConfig("zk.max.sleep.ms", 45000); + Config.intConfig("zk.max.sleep.ms", 60000); public static final Config ZK_MAX_RETRY = Config.intConfig("zk.max.retry", 29); diff --git a/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs b/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs index 553de5016098..0f87123ca34f 100644 --- a/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs +++ b/interactive_engine/executor/engine/pegasus/network/src/transport/block/mod.rs @@ -107,13 +107,16 @@ pub fn listen_on( /// /// 如果参数中的`server_id` 大于等于当前服务的id,并不会发起连接,返回`Ok(())`; /// -pub fn connect( - local_id: u64, remote_id: u64, params: ConnectionParams, addr: A, +pub fn connect( + local_id: u64, remote_id: u64, params: ConnectionParams, addr: SocketAddr, ) -> Result<(), NetError> { // 连接请求可能会失败, 或许由于对端服务器未启动端口监听,调用方需要根据返回内容确定是否重试; - let mut conn = TcpStream::connect(addr)?; + info!("Try to connect to server {:?}", addr); + let timeout = std::time::Duration::from_secs(10); + let mut conn = TcpStream::connect_timeout(&addr, timeout)?; + // let mut conn = TcpStream::connect(addr)?; let addr = conn.peer_addr()?; - debug!("connect to server {:?};", addr); + info!("connect to server {:?};", addr); let hb_sec = params.get_hb_interval_sec(); super::setup_connection(local_id, hb_sec, &mut conn)?; info!("setup connection to {:?} success;", addr); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java index e9f41b9e1b10..137244c2a617 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/discovery/ZkDiscovery.java @@ -154,7 +154,7 @@ public NodeChangeListener(RoleType roleType, ServiceCache serviceCach @Override public void cacheChanged() { - logger.debug("cacheChanged. roleType [" + roleType.getName() + "]"); + logger.info("cacheChanged. roleType [" + roleType.getName() + "]"); synchronized (lock) { Map newRoleNodes = new HashMap<>(); for (ServiceInstance instance : this.serviceCache.getInstances()) { @@ -207,7 +207,7 @@ private void notifyRemoved(RoleType role, Map removed) { if (removed.isEmpty()) { return; } - logger.debug("role [{}] remove nodes [{}]", role.getName(), removed.values()); + logger.info("role [{}] remove nodes [{}]", role.getName(), removed.values()); for (Listener listener : this.listeners) { this.singleThreadExecutor.execute( () -> { @@ -225,7 +225,7 @@ private void notifyAdded(RoleType role, Map added) { if (added.isEmpty()) { return; } - logger.debug("role [{}] add nodes [{}]", role.getName(), added.values()); + logger.info("role [{}] add nodes [{}]", role.getName(), added.values()); for (Listener listener : this.listeners) { this.singleThreadExecutor.execute( () -> { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java index ebc31f3026ce..a6d66e29b7e8 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java @@ -13,6 +13,7 @@ */ package com.alibaba.graphscope.groot.store.jna; +import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.operation.OperationBatch; @@ -61,8 +62,10 @@ public JnaGraphStore(Configs configs, int partitionId) throws IOException { Path walPath = Paths.get(walDir, "" + partitionId); builder.put(StoreConfig.STORE_WAL_DIR.getKey(), walPath.toString()); } - if (!Files.isDirectory(secondPath)) { - Files.createDirectories(secondPath); + if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(configs)) { + if (!Files.isDirectory(secondPath)) { + Files.createDirectories(secondPath); + } } if (!Files.isDirectory(partitionPath)) { Files.createDirectories(partitionPath); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java index 53689d3998e5..a15c3c5e1c03 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/wal/kafka/KafkaLogService.java @@ -161,7 +161,7 @@ private AdminClient createAdminWithRetry() throws InterruptedException { try { return AdminClient.create(adminConfig); } catch (Exception e) { - logger.warn("Error creating Kafka AdminClient", e); + logger.warn("Error creating Kafka AdminClient, servers: {}", servers, e); Thread.sleep(10000); } } diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java index f4429242f033..db765295cbd0 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/ir/GaiaEngine.java @@ -104,10 +104,17 @@ public void stop() { @Override public void nodesJoin(RoleType role, Map nodes) { if (role == RoleType.GAIA_ENGINE) { - this.engineNodes.putAll(nodes); + for (Map.Entry entry : nodes.entrySet()) { + GrootNode node = entry.getValue(); + if (node.getRoleName().equals(RoleType.GAIA_ENGINE.getName())) { + this.engineNodes.put(entry.getKey(), node); + } else { + logger.warn("Unexpected node joined: {}", node); + } + } if (this.engineNodes.size() == this.nodeCount) { String peerViewString = - nodes.values().stream() + engineNodes.values().stream() .map( n -> String.format(