diff --git a/CHANGELOG.md b/CHANGELOG.md index 50c50c4..9705761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Added config option term_client_close_timeout and cli option --term-client-close-timeout to set how long to wait for clients to close their connections before sending Close when amqproxy receives a TERM signal. + ## [v2.0.2] - 2024-08-25 - Compile with Crystal 1.13.2, fixes a memory leak in Hash. diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 56a9db3..b0036bf 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,12 +1,23 @@ +require "log" require "spec" +require "uri" require "../src/amqproxy/server" require "../src/amqproxy/version" require "amqp-client" +Log.setup_from_env(default_level: :error) + MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo " +UPSTREAM_URL = begin + URI.parse ENV.fetch("UPSTREAM_URL", "amqp://127.0.0.1:5672?idle_connection_timeout=5") +rescue e : URI::Error + puts "Invalid UPSTREAM_URL: #{e}" + exit 1 +end + def with_server(idle_connection_timeout = 5, &) - server = AMQProxy::Server.new("127.0.0.1", 5672, false) + server = AMQProxy::Server.new(UPSTREAM_URL) tcp_server = TCPServer.new("127.0.0.1", 0) amqp_url = "amqp://#{tcp_server.local_address}" spawn { server.listen(tcp_server) } @@ -27,3 +38,18 @@ def with_http_server(idle_connection_timeout = 5, &) end end end + +def verify_running_amqp! + tls = UPSTREAM_URL.scheme == "amqps" + host = UPSTREAM_URL.host || "127.0.0.1" + port = UPSTREAM_URL.port || 5762 + port = 5671 if tls && UPSTREAM_URL.port.nil? + TCPSocket.new(host, port, connect_timeout: 3.seconds).close +rescue Socket::ConnectError + STDERR.puts "[ERROR] Specs require a running rabbitmq server on #{host}:#{port}" + exit 1 +end + +Spec.before_suite do + verify_running_amqp! +end diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 2ad4e35..e3b019a 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -1,2 +1,2 @@ require "./amqproxy/cli" -AMQProxy::CLI.new.run +AMQProxy::CLI.new.run(ARGV) diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 65721b4..18dc78f 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -7,13 +7,17 @@ require "ini" require "log" class AMQProxy::CLI + Log = ::Log.for(self) + @listen_address = ENV["LISTEN_ADDRESS"]? || "localhost" @listen_port = ENV["LISTEN_PORT"]? || 5673 @http_port = ENV["HTTP_PORT"]? || 15673 - @log_level : Log::Severity = Log::Severity::Info + @log_level : ::Log::Severity = ::Log::Severity::Info @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i @term_timeout = -1 + @term_client_close_timeout = 0 @upstream = ENV["AMQP_URL"]? + @server : AMQProxy::Server? = nil def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity INI.parse(File.read(path)).each do |name, section| @@ -21,11 +25,12 @@ class AMQProxy::CLI when "main", "" section.each do |key, value| case key - when "upstream" then @upstream = value - when "log_level" then @log_level = Log::Severity.parse(value) - when "idle_connection_timeout" then @idle_connection_timeout = value.to_i - when "term_timeout" then @term_timeout = value.to_i - else raise "Unsupported config #{name}/#{key}" + when "upstream" then @upstream = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "term_timeout" then @term_timeout = value.to_i + when "term_client_close_timeout" then @term_client_close_timeout = value.to_i + else raise "Unsupported config #{name}/#{key}" end end when "listen" @@ -33,7 +38,7 @@ class AMQProxy::CLI case key when "port" then @listen_port = value when "bind", "address" then @listen_address = value - when "log_level" then @log_level = Log::Severity.parse(value) + when "log_level" then @log_level = ::Log::Severity.parse(value) else raise "Unsupported config #{name}/#{key}" end end @@ -44,8 +49,10 @@ class AMQProxy::CLI abort ex.message end - def run - p = OptionParser.parse do |parser| + def run(argv) + raise "run cant be called multiple times" unless @server.nil? + + p = OptionParser.parse(argv) do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @listen_address = v @@ -55,17 +62,20 @@ class AMQProxy::CLI parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maxiumum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i end - parser.on("--term-timeout=SECONDS", "At TERM the server will wait this many seconds for clients to gracefully close their sockets (default: infinite)") do |v| + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| @term_timeout = v.to_i end - parser.on("-d", "--debug", "Verbose logging") { @log_level = Log::Severity::Debug } + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + @term_client_close_timeout = v.to_i + end + parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug } parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end - @upstream ||= ARGV.shift? + @upstream ||= argv.shift? upstream_url = @upstream || abort p.to_s u = URI.parse upstream_url @@ -80,43 +90,88 @@ class AMQProxy::CLI tls = u.scheme == "amqps" log_backend = if ENV.has_key?("JOURNAL_STREAM") - Log::IOBackend.new(formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher) + ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) else - Log::IOBackend.new(formatter: StdoutLogFormat, dispatcher: ::Log::DirectDispatcher) + ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) end - Log.setup_from_env(default_level: @log_level, backend: log_backend) - - server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) - - first_shutdown = true - shutdown = ->(_s : Signal) do - if first_shutdown - first_shutdown = false - server.stop_accepting_clients - server.disconnect_clients - if @term_timeout >= 0 - spawn do - sleep @term_timeout - abort "Exiting with #{server.client_connections} client connections still open" - end - end - else - abort "Exiting with #{server.client_connections} client connections still open" - end - end - Signal::INT.trap &shutdown - Signal::TERM.trap &shutdown + ::Log.setup_from_env(default_level: @log_level, backend: log_backend) + + Signal::INT.trap &->self.initiate_shutdown(Signal) + Signal::TERM.trap &->self.initiate_shutdown(Signal) + + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) + shutdown + # wait until all client connections are closed until server.client_connections.zero? sleep 0.2 end + Log.info { "No clients left. Exiting." } + end + + @first_shutdown = true + + def initiate_shutdown(_s : Signal) + unless server = @server + exit 0 + end + if @first_shutdown + @first_shutdown = false + server.stop_accepting_clients + else + abort "Exiting with #{server.client_connections} client connections still open" + end + end + + def shutdown + unless server = @server + raise "Can't call shutdown before run" + end + if server.client_connections > 0 + if @term_client_close_timeout > 0 + wait_for_clients_to_close @term_client_close_timeout.seconds + end + server.disconnect_clients + end + + if server.client_connections > 0 + if @term_timeout >= 0 + spawn do + sleep @term_timeout + abort "Exiting with #{server.client_connections} client connections still open" + end + end + end + end + + def wait_for_clients_to_close(close_timeout) + unless server = @server + raise "Can't call shutdown before run" + end + Log.info { "Waiting for clients to close their connections." } + ch = Channel(Bool).new + spawn do + loop do + ch.send true if server.client_connections.zero? + sleep 0.1.seconds + end + rescue Channel::ClosedError + end + + select + when ch.receive? + Log.info { "All clients has closed their connections." } + when timeout close_timeout + ch.close + Log.info { "Timeout waiting for clients to close their connections." } + end end - struct JournalLogFormat < Log::StaticFormatter + struct Journal::LogFormat < ::Log::StaticFormatter def run source context(before: '[', after: ']') @@ -126,7 +181,7 @@ class AMQProxy::CLI end end - struct StdoutLogFormat < Log::StaticFormatter + struct Stdout::LogFormat < ::Log::StaticFormatter def run timestamp severity diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index ae9512f..92af3e9 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -1,6 +1,7 @@ require "socket" require "log" require "amq-protocol" +require "uri" require "./channel_pool" require "./client" require "./upstream" @@ -11,6 +12,15 @@ module AMQProxy @clients_lock = Mutex.new @clients = Array(Client).new + def self.new(url : URI) + tls = url.scheme == "amqps" + host = url.host || "127.0.0.1" + port = url.port || 5762 + port = 5671 if tls && url.port.nil? + idle_connection_timeout = url.query_params.fetch("idle_connection_timeout", 5).to_i + new(host, port, tls, idle_connection_timeout) + end + def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5) tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls @channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|