Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socket_manager: add feature to take over another server #150

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,14 @@ se = ServerEngine.create(MyServer, MyWorker, {
se.run
```

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).
Other features:

- `socket_manager_server = SocketManager::Server.take_over_another_server(path)`
- It starts a new manager server that has all UDP/TCP sockets of the existing manager.
- It receives the sockets and stops the existing manager after starts a new manager.
- It means that another process can take over UDP/TCP sockets without downtime.

See also [examples](https://github.com/fluent/serverengine/tree/master/examples).

## Module API

Expand Down
16 changes: 12 additions & 4 deletions lib/serverengine/socket_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,22 @@ def self.open(path = nil)
end
end

def initialize(path)
def self.take_over_another_server(path)
raise NotImplementedError, "Not supported on Windows." if ServerEngine.windows?
server = new(path, start: false)
server.take_over_another_server
server
end

def initialize(path, start: true)
@tcp_sockets = {}
@udp_sockets = {}
@mutex = Mutex.new
@path = start_server(path)
@path = start ? start_server(path) : path
end

attr_reader :path
attr_reader :tcp_sockets, :udp_sockets # for tests

def new_client
Client.new(@path)
Expand Down Expand Up @@ -159,9 +167,9 @@ def process_peer(peer)
res = SocketManager.recv_peer(peer)
return if res.nil?

pid, method, bind, port = *res
pid, method, *opts = res
begin
send_socket(peer, pid, method, bind, port)
send_socket(peer, pid, method, *opts)
rescue => e
SocketManager.send_peer(peer, e)
end
Expand Down
126 changes: 86 additions & 40 deletions lib/serverengine/socket_manager_unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,63 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def take_over_another_server
another_server = UNIXSocket.new(@path)
begin
idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_tcp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@tcp_sockets[key] = another_server.recv_io TCPServer
idx += 1
end

idx = 0
while true
SocketManager.send_peer(another_server, [Process.pid, :get_listening_udp, idx])
key = SocketManager.recv_peer(another_server)
break if key.nil?
@udp_sockets[key] = another_server.recv_io UDPSocket
idx += 1
end

FileUtils.rm_f(@path)
start_server(@path)

SocketManager.send_peer(another_server, [Process.pid, :stop_with_socket_alive])
ensure
another_server.close
end
end

private

def listen_tcp_new(bind_ip, port)
Expand Down Expand Up @@ -76,33 +133,6 @@ def listen_udp_new(bind_ip, port)
UDPSocket.for_fd(usock.fileno)
end

def start_server(path)
# return absolute path so that client can connect to this path
# when client changed working directory
path = File.expand_path(path)

begin
old_umask = File.umask(0077) # Protect unix socket from other users
@server = UNIXServer.new(path)
ensure
File.umask(old_umask)
end

@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return path
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand All @@ -111,19 +141,35 @@ def stop_server
@thread.join if RUBY_VERSION >= "2.2"
end

def send_socket(peer, pid, method, bind, port)
sock = case method
when :listen_tcp
listen_tcp(bind, port)
when :listen_udp
listen_udp(bind, port)
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end

SocketManager.send_peer(peer, nil)

peer.send_io sock
def send_socket(peer, pid, method, *opts)
case method
when :listen_tcp
bind, port = opts
sock = listen_tcp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :listen_udp
bind, port = opts
sock = listen_udp(bind, port)
SocketManager.send_peer(peer, nil)
peer.send_io sock
when :get_listening_tcp
idx, = opts
key = @tcp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@tcp_sockets.values[idx]) if key
when :get_listening_udp
idx, = opts
key = @udp_sockets.keys[idx]
SocketManager.send_peer(peer, key)
peer.send_io(@udp_sockets.values[idx]) if key
when :stop_with_socket_alive
@tcp_sockets.clear
@udp_sockets.clear
stop_server
else
raise ArgumentError, "Unknown method: #{method.inspect}"
end
end
end

Expand Down
40 changes: 20 additions & 20 deletions lib/serverengine/socket_manager_win.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ def recv_udp(family, peer, sent)
end

module ServerModule
def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

private

TCP_OPTIONS = [Socket::SOCK_STREAM, Socket::IPPROTO_TCP, TCPServer, true]
Expand Down Expand Up @@ -107,26 +127,6 @@ def htons(h)
[h].pack("S").unpack("n")[0]
end

def start_server(addr)
# We need to take care about selecting an available port.
# By passing `nil` or `0` as `addr`, an available port is automatically selected.
# However, we should consider using NamedPipe instead of TCPServer.
@server = TCPServer.new("127.0.0.1", addr)
@thread = Thread.new do
begin
while peer = @server.accept
Thread.new(peer, &method(:process_peer)) # process_peer calls send_socket
end
rescue => e
unless @server.closed?
ServerEngine.dump_uncaught_error(e)
end
end
end

return @server.addr[1]
end

def stop_server
@tcp_sockets.reject! {|key,lsock| lsock.close; true }
@udp_sockets.reject! {|key,usock| usock.close; true }
Expand Down
35 changes: 35 additions & 0 deletions spec/socket_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
expect(server.path).to be_between(49152, 65535)
end
end

context 'Server.take_over_another_server' do
it 'not supported' do
server = SocketManager::Server.open(server_path)
expect { SocketManager::Server.take_over_another_server(server_path) }.to raise_error(NotImplementedError)
ensure
server.close
end
end
else
context 'Server.generate_path' do
it 'returns socket path under /tmp' do
Expand All @@ -76,6 +85,32 @@
expect(server.path).to include('/tmp/SERVERENGINE_SOCKETMANAGER_')
end
end

context 'Server.take_over_another_server' do
it 'should take over listen sockets to another server' do
server = SocketManager::Server.open(server_path)

client = ServerEngine::SocketManager::Client.new(server_path)
tcp1 = client.listen_tcp('127.0.0.1', 55551)
udp1 = client.listen_udp('127.0.0.1', 55561)
udp2 = client.listen_udp('127.0.0.1', 55562)

another_server = SocketManager::Server.take_over_another_server(server_path)

expect(another_server.tcp_sockets.size).to eq(1)
expect(another_server.tcp_sockets['localhost:55551'].addr).to eq(['AF_INET', 55551, '127.0.0.1', '127.0.0.1'])

expect(another_server.udp_sockets.size).to eq(2)
expect(another_server.udp_sockets['localhost:55561'].addr).to eq(['AF_INET', 55561, '127.0.0.1', '127.0.0.1'])
expect(another_server.udp_sockets['localhost:55562'].addr).to eq(['AF_INET', 55562, '127.0.0.1', '127.0.0.1'])
ensure
tcp1.close
udp1.close
udp2.close
server.close
another_server.close
end
end
end

context 'with thread' do
Expand Down
Loading