Skip to content

Commit

Permalink
Add client close timeout on term (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Oct 23, 2024
1 parent 6026688 commit 1d23535
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 40 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

- Added config option term_client_close_timeout and cli option --term-client-close-timeout to set how long to wait for clients to close their connections before sending Close when amqproxy receives a TERM signal.

## [v2.0.2] - 2024-08-25

- Compile with Crystal 1.13.2, fixes a memory leak in Hash.
Expand Down
28 changes: 27 additions & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
require "log"
require "spec"
require "uri"
require "../src/amqproxy/server"
require "../src/amqproxy/version"
require "amqp-client"

Log.setup_from_env(default_level: :error)

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

UPSTREAM_URL = begin
URI.parse ENV.fetch("UPSTREAM_URL", "amqp://127.0.0.1:5672?idle_connection_timeout=5")
rescue e : URI::Error
puts "Invalid UPSTREAM_URL: #{e}"
exit 1
end

def with_server(idle_connection_timeout = 5, &)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
server = AMQProxy::Server.new(UPSTREAM_URL)
tcp_server = TCPServer.new("127.0.0.1", 0)
amqp_url = "amqp://#{tcp_server.local_address}"
spawn { server.listen(tcp_server) }
Expand All @@ -27,3 +38,18 @@ def with_http_server(idle_connection_timeout = 5, &)
end
end
end

def verify_running_amqp!
tls = UPSTREAM_URL.scheme == "amqps"
host = UPSTREAM_URL.host || "127.0.0.1"
port = UPSTREAM_URL.port || 5762
port = 5671 if tls && UPSTREAM_URL.port.nil?
TCPSocket.new(host, port, connect_timeout: 3.seconds).close
rescue Socket::ConnectError
STDERR.puts "[ERROR] Specs require a running rabbitmq server on #{host}:#{port}"
exit 1
end

Spec.before_suite do
verify_running_amqp!
end
2 changes: 1 addition & 1 deletion src/amqproxy.cr
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
require "./amqproxy/cli"
AMQProxy::CLI.new.run
AMQProxy::CLI.new.run(ARGV)
131 changes: 93 additions & 38 deletions src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,38 @@ require "ini"
require "log"

class AMQProxy::CLI
Log = ::Log.for(self)

@listen_address = ENV["LISTEN_ADDRESS"]? || "localhost"
@listen_port = ENV["LISTEN_PORT"]? || 5673
@http_port = ENV["HTTP_PORT"]? || 15673
@log_level : Log::Severity = Log::Severity::Info
@log_level : ::Log::Severity = ::Log::Severity::Info
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@term_timeout = -1
@term_client_close_timeout = 0
@upstream = ENV["AMQP_URL"]?
@server : AMQProxy::Server? = nil

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
INI.parse(File.read(path)).each do |name, section|
case name
when "main", ""
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "term_timeout" then @term_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
when "upstream" then @upstream = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "term_timeout" then @term_timeout = value.to_i
when "term_client_close_timeout" then @term_client_close_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
end
when "listen"
section.each do |key, value|
case key
when "port" then @listen_port = value
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = Log::Severity.parse(value)
when "log_level" then @log_level = ::Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
Expand All @@ -44,8 +49,10 @@ class AMQProxy::CLI
abort ex.message
end

def run
p = OptionParser.parse do |parser|
def run(argv)
raise "run cant be called multiple times" unless @server.nil?

p = OptionParser.parse(argv) do |parser|
parser.banner = "Usage: amqproxy [options] [amqp upstream url]"
parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v|
@listen_address = v
Expand All @@ -55,17 +62,20 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("--term-timeout=SECONDS", "At TERM the server will wait this many seconds for clients to gracefully close their sockets (default: infinite)") do |v|
parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v|
@term_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug }
parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v|
@term_client_close_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug }
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
parser.invalid_option { |arg| abort "Invalid argument: #{arg}" }
end

@upstream ||= ARGV.shift?
@upstream ||= argv.shift?
upstream_url = @upstream || abort p.to_s

u = URI.parse upstream_url
Expand All @@ -80,43 +90,88 @@ class AMQProxy::CLI
tls = u.scheme == "amqps"

log_backend = if ENV.has_key?("JOURNAL_STREAM")
Log::IOBackend.new(formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher)
::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher)
else
Log::IOBackend.new(formatter: StdoutLogFormat, dispatcher: ::Log::DirectDispatcher)
::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher)
end
Log.setup_from_env(default_level: @log_level, backend: log_backend)

server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)

first_shutdown = true
shutdown = ->(_s : Signal) do
if first_shutdown
first_shutdown = false
server.stop_accepting_clients
server.disconnect_clients
if @term_timeout >= 0
spawn do
sleep @term_timeout
abort "Exiting with #{server.client_connections} client connections still open"
end
end
else
abort "Exiting with #{server.client_connections} client connections still open"
end
end
Signal::INT.trap &shutdown
Signal::TERM.trap &shutdown
::Log.setup_from_env(default_level: @log_level, backend: log_backend)

Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)

HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)

shutdown

# wait until all client connections are closed
until server.client_connections.zero?
sleep 0.2
end
Log.info { "No clients left. Exiting." }
end

@first_shutdown = true

def initiate_shutdown(_s : Signal)
unless server = @server
exit 0
end
if @first_shutdown
@first_shutdown = false
server.stop_accepting_clients
else
abort "Exiting with #{server.client_connections} client connections still open"
end
end

def shutdown
unless server = @server
raise "Can't call shutdown before run"
end
if server.client_connections > 0
if @term_client_close_timeout > 0
wait_for_clients_to_close @term_client_close_timeout.seconds
end
server.disconnect_clients
end

if server.client_connections > 0
if @term_timeout >= 0
spawn do
sleep @term_timeout
abort "Exiting with #{server.client_connections} client connections still open"
end
end
end
end

def wait_for_clients_to_close(close_timeout)
unless server = @server
raise "Can't call shutdown before run"
end
Log.info { "Waiting for clients to close their connections." }
ch = Channel(Bool).new
spawn do
loop do
ch.send true if server.client_connections.zero?
sleep 0.1.seconds
end
rescue Channel::ClosedError
end

select
when ch.receive?
Log.info { "All clients has closed their connections." }
when timeout close_timeout
ch.close
Log.info { "Timeout waiting for clients to close their connections." }
end
end

struct JournalLogFormat < Log::StaticFormatter
struct Journal::LogFormat < ::Log::StaticFormatter
def run
source
context(before: '[', after: ']')
Expand All @@ -126,7 +181,7 @@ class AMQProxy::CLI
end
end

struct StdoutLogFormat < Log::StaticFormatter
struct Stdout::LogFormat < ::Log::StaticFormatter
def run
timestamp
severity
Expand Down
10 changes: 10 additions & 0 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "socket"
require "log"
require "amq-protocol"
require "uri"
require "./channel_pool"
require "./client"
require "./upstream"
Expand All @@ -11,6 +12,15 @@ module AMQProxy
@clients_lock = Mutex.new
@clients = Array(Client).new

def self.new(url : URI)
tls = url.scheme == "amqps"
host = url.host || "127.0.0.1"
port = url.port || 5762
port = 5671 if tls && url.port.nil?
idle_connection_timeout = url.query_params.fetch("idle_connection_timeout", 5).to_i
new(host, port, tls, idle_connection_timeout)
end

def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5)
tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls
@channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|
Expand Down

0 comments on commit 1d23535

Please sign in to comment.