-
Notifications
You must be signed in to change notification settings - Fork 0
/
io_daemonizer.rb
142 lines (118 loc) · 3.15 KB
/
io_daemonizer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# io_daemonizer v.7.2 https://github.com/schoblaska/io_daemonizer
require "json"
require "shellwords"
require "socket"
require "stringio"
class IODaemonizer
def self.wrap(port:, setup:, run:, autostart: true)
case ARGV[0]
when "start"
puts "starting server..."
Daemon.run(port: port, setup: setup, run: run)
when "stop"
puts "stopping server..."
send_request(port: port, args: ARGV)
else
begin
send_request(port: port, args: ARGV)
rescue Errno::ECONNREFUSED => e
raise(e) unless autostart
Daemon.run(port: port, setup: setup, run: run, fork: true)
sleep 0.1
send_request(port: port, args: ARGV)
end
end
rescue Errno::ECONNREFUSED
puts "server not running or not responding"
end
def self.send_request(port:, args:)
consumer = LabeledIOConsumer.new
TCPSocket.open("127.0.0.1", port) do |socket|
socket.puts args.shelljoin
socket.write $stdin.tty? ? "" : $stdin.read
socket.close_write
consumer.write(socket.read) until socket.eof?
end
end
def self.redirect(stdin: $stdin, stdout: $stdout, stderr: $stderr)
oldin, oldout, olderr = $stdin.dup, $stdout.dup, $stderr.dup
$stdin, $stdout, $stderr = stdin, stdout, stderr
yield
ensure
$stdin, $stdout, $stderr = oldin, oldout, olderr
end
class Daemon
def self.run(port:, setup:, run:, fork: false)
daemon = new(port: port, setup: setup, run: run)
daemon.setup
fork ? Kernel.fork { daemon.start } : daemon.start
end
def initialize(port:, setup:, run:)
@port = port
@setup = setup
@run = run
@context = Object.new
end
def setup
@context.instance_exec &@setup
end
def start
@server = TCPServer.open("127.0.0.1", @port)
Process.daemon(true)
read_socket(@server.accept) until @server.closed?
end
private
def read_socket(socket)
raw_args, *body = socket.read.lines
args = raw_args.shellsplit
if args[0] == "stop"
@server.close
else
IODaemonizer.redirect(
stdin: StringIO.new(body.join),
stdout: IOLabeler.new(1, socket, "stdout"),
stderr: IOLabeler.new(2, socket, "stderr")
) { @context.instance_exec args, &@run }
end
rescue => e
socket.write e.inspect
raise e
ensure
socket.close_write
socket.close
end
end
class IOLabeler < IO
attr_reader :label
def initialize(fd, socket, label)
super(fd)
@socket = socket
@label = label
end
def write(chunk)
@socket.write({ @label => chunk }.to_json)
end
def reopen(io)
@label = io&.label || @label
end
end
class LabeledIOConsumer
def initialize
@buffer = ""
end
def write(chunk)
chunk.chars.each do |ch|
@buffer << ch
process_buffer if ch == "}"
end
end
private
def process_buffer
parsed = JSON.parse(@buffer)
$stdout.write(parsed["stdout"]) if parsed["stdout"]
$stderr.write(parsed["stderr"]) if parsed["stderr"]
@buffer = ""
rescue JSON::ParserError
end
end
end