From 4a5b1a41c952be73614619ca3d61fa88ed94b932 Mon Sep 17 00:00:00 2001 From: Daijiro Fukuda Date: Fri, 30 Aug 2024 15:52:43 +0900 Subject: [PATCH 1/2] socket_manager: add feature to take over another server Another process can take over UDP/TCP sockets without downtime. server = ServerEngine::SocketManager::Server.take_over_another_server(path) This starts a new server that has all UDP/TCP sockets of the existing server. It receives the sockets from the existing server and stops it after starts a new server. This may not be the primary use case assumed by ServerEngine, but we need this feature to replace both the server and the workers with a new process without downtime. Currently, ServerEngine does not provide this feature for network servers. At the moment, I assume that the application side uses this feature ad hoc, but, in the future, this could be used to support live reload for entire network servers. ref: https://github.com/fluent/fluentd/issues/4622 Signed-off-by: Daijiro Fukuda --- README.md | 8 +- lib/serverengine/socket_manager.rb | 15 ++- lib/serverengine/socket_manager_unix.rb | 126 ++++++++++++++++-------- lib/serverengine/socket_manager_win.rb | 40 ++++---- 4 files changed, 124 insertions(+), 65 deletions(-) diff --git a/README.md b/README.md index 3edf48b..40d943d 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, { se.run ``` -See also [examples](https://github.com/fluent/serverengine/tree/master/examples). +Other features: + +- `socket_manager_server = SocketManager::Server.take_over_another_server(path)` + - It starts a new manager server that has all UDP/TCP sockets of the existing manager. + - It receives the sockets and stops the existing manager after starts a new manager. + - It means that another process can take over UDP/TCP sockets without downtime. +See also [examples](https://github.com/fluent/serverengine/tree/master/examples). ## Module API diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index fe0e484..f65c941 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -96,11 +96,18 @@ def self.open(path = nil) end end - def initialize(path) + def self.take_over_another_server(path) + raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows? + server = new(path, start: false) + server.take_over_another_server + server + end + + def initialize(path, start: true) @tcp_sockets = {} @udp_sockets = {} @mutex = Mutex.new - @path = start_server(path) + @path = start ? start_server(path) : path end attr_reader :path @@ -159,9 +166,9 @@ def process_peer(peer) res = SocketManager.recv_peer(peer) return if res.nil? - pid, method, bind, port = *res + pid, method, *opts = res begin - send_socket(peer, pid, method, bind, port) + send_socket(peer, pid, method, *opts) rescue => e SocketManager.send_peer(peer, e) end diff --git a/lib/serverengine/socket_manager_unix.rb b/lib/serverengine/socket_manager_unix.rb index 625a831..17e3ee1 100644 --- a/lib/serverengine/socket_manager_unix.rb +++ b/lib/serverengine/socket_manager_unix.rb @@ -47,6 +47,63 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(path) + # return absolute path so that client can connect to this path + # when client changed working directory + path = File.expand_path(path) + + begin + old_umask = File.umask(0077) # Protect unix socket from other users + @server = UNIXServer.new(path) + ensure + File.umask(old_umask) + end + + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return path + end + + def take_over_another_server + another_server = UNIXSocket.new(@path) + begin + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @tcp_sockets[key] = another_server.recv_io TCPServer + idx += 1 + end + + idx = 0 + while true + SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx]) + key = SocketManager.recv_peer(another_server) + break if key.nil? + @udp_sockets[key] = another_server.recv_io UDPSocket + idx += 1 + end + + FileUtils.rm_f(@path) + start_server(@path) + + SocketManager.send_peer(another_server, [Process.pid, :stop_with_socket_alive]) + ensure + another_server.close + end + end + private def listen_tcp_new(bind_ip, port) @@ -76,33 +133,6 @@ def listen_udp_new(bind_ip, port) UDPSocket.for_fd(usock.fileno) end - def start_server(path) - # return absolute path so that client can connect to this path - # when client changed working directory - path = File.expand_path(path) - - begin - old_umask = File.umask(0077) # Protect unix socket from other users - @server = UNIXServer.new(path) - ensure - File.umask(old_umask) - end - - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return path - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } @@ -111,19 +141,35 @@ def stop_server @thread.join if RUBY_VERSION >= "2.2" end - def send_socket(peer, pid, method, bind, port) - sock = case method - when :listen_tcp - listen_tcp(bind, port) - when :listen_udp - listen_udp(bind, port) - else - raise ArgumentError, "Unknown method: #{method.inspect}" - end - - SocketManager.send_peer(peer, nil) - - peer.send_io sock + def send_socket(peer, pid, method, *opts) + case method + when :listen_tcp + bind, port = opts + sock = listen_tcp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :listen_udp + bind, port = opts + sock = listen_udp(bind, port) + SocketManager.send_peer(peer, nil) + peer.send_io sock + when :get_listening_tcp + idx, = opts + key = @tcp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@tcp_sockets.values[idx]) if key + when :get_listening_udp + idx, = opts + key = @udp_sockets.keys[idx] + SocketManager.send_peer(peer, key) + peer.send_io(@udp_sockets.values[idx]) if key + when :stop_with_socket_alive + @tcp_sockets.clear + @udp_sockets.clear + stop_server + else + raise ArgumentError, "Unknown method: #{method.inspect}" + end end end diff --git a/lib/serverengine/socket_manager_win.rb b/lib/serverengine/socket_manager_win.rb index f7a7e26..42acaa6 100644 --- a/lib/serverengine/socket_manager_win.rb +++ b/lib/serverengine/socket_manager_win.rb @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent) end module ServerModule + def start_server(addr) + # We need to take care about selecting an available port. + # By passing `nil` or `0` as `addr`, an available port is automatically selected. + # However, we should consider using NamedPipe instead of TCPServer. + @server = TCPServer.new("127.0.0.1", addr) + @thread = Thread.new do + begin + while peer = @server.accept + Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket + end + rescue => e + unless @server.closed? + ServerEngine.dump_uncaught_error(e) + end + end + end + + return @server.addr[1] + end + private TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true] @@ -107,26 +127,6 @@ def htons(h) [h].pack("S").unpack("n")[0] end - def start_server(addr) - # We need to take care about selecting an available port. - # By passing `nil` or `0` as `addr`, an available port is automatically selected. - # However, we should consider using NamedPipe instead of TCPServer. - @server = TCPServer.new("127.0.0.1", addr) - @thread = Thread.new do - begin - while peer = @server.accept - Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket - end - rescue => e - unless @server.closed? - ServerEngine.dump_uncaught_error(e) - end - end - end - - return @server.addr[1] - end - def stop_server @tcp_sockets.reject! {|key,lsock| lsock.close; true } @udp_sockets.reject! {|key,usock| usock.close; true } From 03bc03145655f4b3ca4751e96a2edca76f52ce71 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Tue, 15 Oct 2024 16:11:26 +0900 Subject: [PATCH 2/2] socket_manager_spec: add tests Signed-off-by: Shizuo Fujita --- lib/serverengine/socket_manager.rb | 1 + spec/socket_manager_spec.rb | 35 ++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/lib/serverengine/socket_manager.rb b/lib/serverengine/socket_manager.rb index f65c941..83d5f42 100644 --- a/lib/serverengine/socket_manager.rb +++ b/lib/serverengine/socket_manager.rb @@ -111,6 +111,7 @@ def initialize(path, start: true) end attr_reader :path + attr_reader :tcp_sockets, :udp_sockets # for tests def new_client Client.new(@path) diff --git a/spec/socket_manager_spec.rb b/spec/socket_manager_spec.rb index c74e877..b80232d 100644 --- a/spec/socket_manager_spec.rb +++ b/spec/socket_manager_spec.rb @@ -55,6 +55,15 @@ expect(server.path).to be_between(49152, 65535) end end + + context 'Server.take_over_another_server' do + it 'not supported' do + server = SocketManager::Server.open(server_path) + expect { SocketManager::Server.take_over_another_server(server_path) }.to raise_error(NotImplementedError) + ensure + server.close + end + end else context 'Server.generate_path' do it 'returns socket path under /tmp' do @@ -76,6 +85,32 @@ expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_') end end + + context 'Server.take_over_another_server' do + it 'should take over listen sockets to another server' do + server = SocketManager::Server.open(server_path) + + client = ServerEngine::SocketManager::Client.new(server_path) + tcp1 = client.listen_tcp('127.0.0.1', 55551) + udp1 = client.listen_udp('127.0.0.1', 55561) + udp2 = client.listen_udp('127.0.0.1', 55562) + + another_server = SocketManager::Server.take_over_another_server(server_path) + + expect(another_server.tcp_sockets.size).to eq(1) + expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1']) + + expect(another_server.udp_sockets.size).to eq(2) + expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1']) + expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1']) + ensure + tcp1.close + udp1.close + udp2.close + server.close + another_server.close + end + end end context 'with thread' do