Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unix Domain Socket Listeners #109

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
*.gem
*.rbc
*.swp
.bundle
.config
.yardoc
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
source "https://rubygems.org/"

# Only needed for examples
#gem "thin-attach_socket"
# gem "thin-attach_socket"

# Specify your gem's dependencies in einhorn.gemspec
gemspec
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ arguments:

Each address is specified as an ip/port pair, possibly accompanied by options:

ADDR := (IP:PORT)[<,OPT>...]
IP_ADDR := (IP:PORT)[<,OPT>...]

or as a path to a UNIX domain socket, also possibly accompanied by options:

UNIX_ADDR := /path/to/socket[<,OPT>...]

In the worker process, the opened file descriptors will be represented
as file descriptor numbers in a series of environment variables named
Expand Down
37 changes: 30 additions & 7 deletions bin/einhorn
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ arguments:

Each address is specified as an ip/port pair, possibly accompanied by options:

ADDR := (IP:PORT)[<,OPT>...]
IP_ADDR := (IP:PORT)[<,OPT>...]

or as a path to a UNIX domain socket, also possibly accompanied by options:

UNIX_ADDR := /path/to/socket[<,OPT>...]

In the worker process, the opened file descriptors will be represented
as file descriptor numbers in a series of environment variables named
Expand Down Expand Up @@ -181,6 +185,20 @@ EOF
end
end


BIND_PATTERN = /\A
# we accept two types of socket address
(?:
# ip, host:port
(?:(?<host>[^:]+):(?<port>\d+)) |
# unix socket path
(?<path>[^,:]+)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested this against abstract namespace sockets, but my wetware suggests it should work

)

# flags are optional, comma delimited, and come at the end
(?<flags>(?:,\w+)*)
\Z/x

# Would be nice if this could be loadable rather than always
# executing, but when run under gem it's a bit hard to do so.
if true # $0 == __FILE__
Expand All @@ -190,14 +208,19 @@ if true # $0 == __FILE__

optparse = OptionParser.new do |opts|
opts.on('-b ADDR', '--bind ADDR', 'Bind an address and add the corresponding FD via the environment') do |addr|
unless addr =~ /\A([^:]+):(\d+)((?:,\w+)*)\Z/
raise "Invalid value for #{addr.inspect}: bind address must be of the form address:port[,flags...]"
unless addr =~ BIND_PATTERN
raise "Invalid value for #{addr.inspect}: bind address must be of the form address:port[,flags...] or /path/to/unix/socket[,flags...]"
end

flags = $~["flags"].split(",").reject(&:empty?).map(&:downcase)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can bind the match to a variable, instead of $~ but this was the original patch


bind = if $~["path"]
Einhorn::Bind::Unix.new($~["path"], flags)
else
Einhorn::Bind::Inet.new($~["host"], Integer($~["port"]), flags)
end

host = $1
port = Integer($2)
flags = $3.split(',').select {|flag| flag.length > 0}.map {|flag| flag.downcase}
Einhorn::State.bind << [host, port, flags]
Einhorn::State.bind << bind
end

opts.on('-c CMD_NAME', '--command-name CMD_NAME', 'Set the command name in ps to this value') do |cmd_name|
Expand Down
3 changes: 2 additions & 1 deletion einhorn.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ Gem::Specification.new do |gem|

gem.files = ["einhorn.gemspec", "README.md", "Changes.md", "LICENSE.txt"] + `git ls-files bin lib example`.split("\n")
gem.executables = %w[einhorn einhornsh]
gem.test_files = []
gem.name = "einhorn"
gem.require_paths = ["lib"]
gem.required_ruby_version = ">= 2.5.0"
Expand All @@ -26,4 +25,6 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "minitest", "~> 5"
gem.add_development_dependency "mocha", "~> 1"
gem.add_development_dependency "subprocess", "~> 1"
gem.add_development_dependency "pry"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debugging interprocess communication is still hard

gem.add_development_dependency "pry-byebug"
end
54 changes: 41 additions & 13 deletions lib/einhorn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ def self.update_state(store, store_name, old_state)
Einhorn::Event::Timer.open(0) do
dead.each { |pid| Einhorn::Command.cleanup(pid) }
end

updated_state[:bind]&.map! do |binding|
# bindings used to just be arrays of [host,port,flags]
if binding.is_a? Array
Bind::Inet.new(*binding)
else
binding
end
end
end

default = store.default_state
Expand All @@ -161,25 +170,24 @@ def self.print_state
log_info(Einhorn::State.state.pretty_inspect)
end

def self.bind(addr, port, flags)
log_info("Binding to #{addr}:#{port} with flags #{flags.inspect}")
sd = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
Einhorn::Compat.cloexec!(sd, false)
def self.bind(binding)
log_info("Binding to #{binding.address} with flags #{binding.flags.inspect}")

if flags.include?("r") || flags.include?("so_reuseaddr")
sd.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
end
sd = binding.make_socket
Einhorn::Compat.cloexec!(sd, false)

sd.bind(Socket.pack_sockaddr_in(port, addr))
binding.bind(sd)
sd.listen(Einhorn::State.config[:backlog])

if flags.include?("n") || flags.include?("o_nonblock")
if binding.flags.include?("n") || binding.flags.include?("o_nonblock")
fl = sd.fcntl(Fcntl::F_GETFL)
sd.fcntl(Fcntl::F_SETFL, fl | Fcntl::O_NONBLOCK)
end

Einhorn::TransientState.socket_handles << sd
[sd.fileno, sd.local_address.ip_port]

sd_port = binding.family == "AF_INET" ? sd.local_address.ip_port : nil
[sd.fileno, sd_port]
end

# Implement these ourselves so it plays nicely with state persistence
Expand Down Expand Up @@ -342,10 +350,29 @@ def self.renice_self
end

def self.socketify_env!
Einhorn::State.bind.each do |host, port, flags|
fd, actual_port = bind(host, port, flags)
Einhorn::State.bind.each do |binding|
fd, actual_port = bind(binding)
Einhorn::State.bind_fds << fd
Einhorn::State.bound_ports << actual_port
Einhorn::State.bound_ports << actual_port if actual_port
end
end

# This duplicates some code from the environment path, but is
# deprecated so that's ok.
def self.socketify!(cmd)
cmd.map! do |arg|
if arg =~ /^(.*=|)srv:([^:]+):(\d+)((?:,\w+)*)$/
log_error("Using deprecated command-line configuration for Einhorn; should upgrade to the environment variable interface.")
opt = $1
host = $2
port = $3
flags = $4.split(",").select { |flag| flag.length > 0 }.map { |flag| flag.downcase }
binding = Bind::Inet.new(host, port, flags)
fd = (Einhorn::State.sockets[[host, port]] ||= bind(binding))
"#{opt}#{fd}"
else
arg
end
end
end

Expand Down Expand Up @@ -454,6 +481,7 @@ def self.run
end
end

require "einhorn/bind"
require "einhorn/command"
require "einhorn/compat"
require "einhorn/client"
Expand Down
97 changes: 97 additions & 0 deletions lib/einhorn/bind.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
require "socket"

module Einhorn::Bind
class Bind
attr_reader :flags

def ==(other)
other.class == self.class && other.state == state
end
end

class Inet < Bind
def initialize(host, port, flags)
@host = host
@port = port
@flags = flags
end

def state
[@host, @port, @flags]
end

def family
"AF_INET"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I tried making this Socket::AF_INET but there are some gotchyas. This eventually maps to socket.h and I vaguely recall these differing between systems (specifically OSX and Linux)

When dumping this state, the string "AF_INET" was more helpful than 2

require 'socket'
irb(main):002:0> Socket::AF_INET
=> 2

This also lets one import state between platforms (probably only helpful for debugging/dev)

(likewise for Unix#family below)

end

def address
"#{@host}:#{@port}"
end

def make_socket
sd = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)

if @flags.include?("r") || @flags.include?("so_reuseaddr")
sd.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
end

sd
end

def bind(sock)
sock.bind(Socket.pack_sockaddr_in(@port, @host))
end
end

class Unix < Bind
def initialize(path, flags)
@path = path
@flags = flags
end

def state
[@path, @flags]
end

def family
"AF_UNIX"
end

def address
@path.to_s
end

def make_socket
Socket.new(Socket::AF_UNIX, Socket::SOCK_STREAM, 0)
end

def clean_old_unix_socket
begin
sock = UNIXSocket.new(@path)
rescue Errno::ECONNREFUSED
# This happens with non-socket files and when the listening
# end of a socket has exited.
rescue Errno::ENOENT
# Socket doesn't exist
return
else
# Rats, it's still active
sock.close
raise Errno::EADDRINUSE.new("Another process is listening on the UNIX socket at #{@path}. If you'd like to run this Einhorn as well, pass a `-b PATH_TO_SOCKET` to change the socket location.")
end

stat = File.stat(@path)
unless stat.socket?
raise Errno::EADDRINUSE.new("Non-socket file present at UNIX socket path #{@path}. Either remove that file and restart Einhorn, or pass a different `-b PATH_TO_SOCKET` to change where you are binding.")
end

Einhorn.log_info("Blowing away old UNIX socket at #{@path}. This likely indicates a previous Einhorn master which exited uncleanly.")
File.unlink(@path)
end

def bind(sock)
clean_old_unix_socket
sock.bind(Socket.pack_sockaddr_un(@path))
end
end
end
1 change: 1 addition & 0 deletions lib/einhorn/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ def self.prepare_child_environment(index)

ENV["EINHORN_FD_COUNT"] = Einhorn::State.bind_fds.length.to_s
Einhorn::State.bind_fds.each_with_index { |fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s }
Einhorn::State.bind.each_with_index { |bind, i| ENV["EINHORN_FD_FAMILY_#{i}"] = bind.family.to_s }
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to_s here to match the line above, and because of the Socket::AF_INET thing above.

It is currently silly, and boils down to: "AF_UNIX".to_s


ENV["EINHORN_CHILD_INDEX"] = index.to_s
end
Expand Down
4 changes: 2 additions & 2 deletions lib/einhorn/safe_yaml.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ module SafeYAML
YAML.safe_load("---", permitted_classes: [])
rescue ArgumentError
def self.load(payload)
YAML.safe_load(payload, [Set, Symbol, Time], [], true)
YAML.safe_load(payload, [Set, Symbol, Time, Einhorn::Bind::Inet, Einhorn::Bind::Unix], [], true)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the line below feels bad. I can parse out the struct / array, but it may get dicey compatibility wise?
Granted the "downgrade case" here isn't possible.
(if I have a running Einhorn that has Einhorn::Bind:Unix, a downgrade of einhorn can't happen, without the class definition)

end
else
def self.load(payload) # rubocop:disable Lint/DuplicateMethods
YAML.safe_load(payload, permitted_classes: [Set, Symbol, Time], aliases: true)
YAML.safe_load(payload, permitted_classes: [Set, Symbol, Time, Einhorn::Bind::Inet, Einhorn::Bind::Unix], aliases: true)
end
end
end
Expand Down
29 changes: 26 additions & 3 deletions test/integration/_lib/helpers/einhorn_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def prepare_fixture_directory(name)

def cleanup_fixtured_directories
(@fixtured_dirs || []).each { |dir| FileUtils.rm_rf(dir) }
FileUtils.rm_rf(@unix_listener_socket_path)
end

def find_free_port(host = "127.0.0.1")
Expand All @@ -109,11 +110,12 @@ def get_state(client)
Einhorn::SafeYAML.load(client.receive_message["message"])[:state]
end

def wait_for_open_port

def mash_retry(&block)
max_retries = 50
begin
read_from_port
rescue Errno::ECONNREFUSED
yield
rescue Errno::ECONNREFUSED, Errno::ENOENT
max_retries -= 1
if max_retries <= 0
raise
Expand All @@ -124,6 +126,27 @@ def wait_for_open_port
end
end

def wait_for_open_socket
mash_retry do
read_from_socket
end
end

def wait_for_open_port
mash_retry do
read_from_port
end
end

def read_from_socket
begin
socket = UNIXSocket.new(@unix_listener_socket_path)
socket.read.chomp
ensure
socket&.close
end
end

def read_from_port
ewouldblock = RUBY_VERSION >= "1.9.0" ? IO::WaitWritable : Errno::EINPROGRESS
socket = Socket.new(Socket::PF_INET, Socket::SOCK_STREAM, 0)
Expand Down
Loading