Skip to content

Commit

Permalink
Use system assigned port for amqproxy listener in specs
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Oct 8, 2024
1 parent 5f0c203 commit 918e6e2
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 131 deletions.
218 changes: 88 additions & 130 deletions spec/amqproxy/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,45 @@ require "../spec_helper"

describe AMQProxy::Server do
it "dont reuse channels closed by upstream" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |server, proxy_url|
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
ch = conn.channel
ch.basic_publish "foobar", "non-existing"
end
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
ch = conn.channel
ch.basic_publish_confirm "foobar", "amq.fanout"
end
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
ch = conn.channel
expect_raises(AMQP::Client::Channel::ClosedException) do
ch.basic_publish_confirm "foobar", "non-existing"
end
end
sleep 0.1
s.upstream_connections.should eq 1
ensure
s.stop_accepting_clients
server.upstream_connections.should eq 1
end
end

it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |server, proxy_url|
Fiber.yield
10.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
ch = conn.channel
ch.basic_publish "foobar", "amq.fanout", ""
s.client_connections.should eq 1
s.upstream_connections.should eq 1
server.client_connections.should eq 1
server.upstream_connections.should eq 1
end
end
s.client_connections.should eq 0
s.upstream_connections.should eq 1
ensure
s.stop_accepting_clients
server.client_connections.should eq 0
server.upstream_connections.should eq 1
end
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { server.listen("127.0.0.1", 5673) }
with_server do |_server, proxy_url|
Fiber.yield

queue_name = "amqproxy-test-queue"
Expand All @@ -59,15 +49,15 @@ describe AMQProxy::Server do
num_messages_to_publish = 5

num_messages_to_publish.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
channel = conn.channel
queue = channel.queue(queue_name)
queue.publish_confirm(message_payload)
end
end
sleep 0.1

AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
channel = conn.channel
channel.basic_consume(queue_name, no_ack: false, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
Expand All @@ -80,140 +70,122 @@ describe AMQProxy::Server do
end

num_received_messages.should eq num_messages_to_publish
ensure
server.stop_accepting_clients
end
end

it "a client can open all channels" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
with_server do |server, proxy_url|
max = 4000
AMQP::Client.start("amqp://localhost:5673?channel_max=#{max}") do |conn|
AMQP::Client.start("#{proxy_url}?channel_max=#{max}") do |conn|
conn.channel_max.should eq max
conn.channel_max.times do
conn.channel
end
s.client_connections.should eq 1
s.upstream_connections.should eq 2
server.client_connections.should eq 1
server.upstream_connections.should eq 2
end
sleep 0.1
s.client_connections.should eq 0
s.upstream_connections.should eq 2
ensure
s.stop_accepting_clients
server.client_connections.should eq 0
server.upstream_connections.should eq 2
end
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |server, proxy_url|
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
conn.channel
system("#{MAYBE_SUDO}rabbitmqctl stop_app > /dev/null").should be_true
end
system("#{MAYBE_SUDO}rabbitmqctl start_app > /dev/null").should be_true
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
conn.channel
s.client_connections.should eq(1)
s.upstream_connections.should eq(1)
server.client_connections.should eq(1)
server.upstream_connections.should eq(1)
end
sleep 0.1
s.client_connections.should eq(0)
s.upstream_connections.should eq(1)
ensure
s.stop_accepting_clients
server.client_connections.should eq(0)
server.upstream_connections.should eq(1)
end
end

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |server, proxy_url|
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
conn.channel
end
sleep 2
s.client_connections.should eq(0)
s.upstream_connections.should eq(1)
server.client_connections.should eq(0)
server.upstream_connections.should eq(1)
ensure
s.stop_accepting_clients
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 60).' > /dev/null").should be_true
end
end

it "supports waiting for client connections on graceful shutdown" do
started = Time.utc.to_unix

s = AMQProxy::Server.new("127.0.0.1", 5672, false, 5)
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
spawn do
s.listen("127.0.0.1", 5673)
end
Fiber.yield
spawn do
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel
wait_for_channel.send(0) # send 0
10.times do
s.client_connections.should be >= 1
s.upstream_connections.should be >= 1
sleep 1
with_server(idle_connection_timeout: 5) do |server, proxy_url|
wait_for_channel = Channel(Int32).new # channel used to wait for certain calls, to test certain behaviour
Fiber.yield
spawn do
AMQP::Client.start(proxy_url) do |conn|
conn.channel
wait_for_channel.send(0) # send 0
10.times do
server.client_connections.should be >= 1
server.upstream_connections.should be >= 1
sleep 1
end
end
wait_for_channel.send(5) # send 5
end
wait_for_channel.send(5) # send 5
end
wait_for_channel.receive.should eq 0 # wait 0
s.client_connections.should eq 1
s.upstream_connections.should eq 1
spawn do
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel
wait_for_channel.send(2) # send 2
sleep 2
end
wait_for_channel.send(3) # send 3
end
wait_for_channel.receive.should eq 2 # wait 2
s.client_connections.should eq 2
s.upstream_connections.should eq 1
spawn s.stop_accepting_clients
wait_for_channel.receive.should eq 3 # wait 3
s.client_connections.should eq 1
s.upstream_connections.should eq 1 # since connection stays open
spawn do
begin
AMQP::Client.start("amqp://localhost:5673") do |conn|
wait_for_channel.receive.should eq 0 # wait 0
server.client_connections.should eq 1
server.upstream_connections.should eq 1
spawn do
AMQP::Client.start(proxy_url) do |conn|
conn.channel
wait_for_channel.send(-1) # send 4 (this should not happen)
sleep 1
wait_for_channel.send(2) # send 2
sleep 2
end
wait_for_channel.send(3) # send 3
end
wait_for_channel.receive.should eq 2 # wait 2
server.client_connections.should eq 2
server.upstream_connections.should eq 1
spawn server.stop_accepting_clients
wait_for_channel.receive.should eq 3 # wait 3
server.client_connections.should eq 1
server.upstream_connections.should eq 1 # since connection stays open
spawn do
begin
AMQP::Client.start(proxy_url) do |conn|
conn.channel
wait_for_channel.send(-1) # send 4 (this should not happen)
sleep 1
end
rescue ex
# ex.message.should be "Error reading socket: Connection reset by peer"
wait_for_channel.send(4) # send 4
end
rescue ex
# ex.message.should be "Error reading socket: Connection reset by peer"
wait_for_channel.send(4) # send 4
end
wait_for_channel.receive.should eq 4 # wait 4
server.client_connections.should eq 1 # since the new connection should not have worked
server.upstream_connections.should eq 1 # since connections stay open
wait_for_channel.receive.should eq 5 # wait 5
server.client_connections.should eq 0 # since now the server should be closed
server.upstream_connections.should eq 1
(Time.utc.to_unix - started).should be < 30
end
wait_for_channel.receive.should eq 4 # wait 4
s.client_connections.should eq 1 # since the new connection should not have worked
s.upstream_connections.should eq 1 # since connections stay open
wait_for_channel.receive.should eq 5 # wait 5
s.client_connections.should eq 0 # since now the server should be closed
s.upstream_connections.should eq 1
(Time.utc.to_unix - started).should be < 30
end

it "works after server closes channel" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |_server, proxy_url|
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
qname = "test#{rand}"
3.times do
expect_raises(AMQP::Client::Channel::ClosedException) do
Expand All @@ -222,18 +194,14 @@ describe AMQProxy::Server do
end
end
end
ensure
s.stop_accepting_clients
end
end

it "passes connection blocked frames to clients" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
done = Channel(Nil).new
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |_server, proxy_url|
done = Channel(Nil).new
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
conn.on_blocked do
done.send nil
system("#{MAYBE_SUDO}rabbitmqctl set_vm_memory_high_watermark 0.8 > /dev/null").should be_true
Expand All @@ -246,42 +214,32 @@ describe AMQProxy::Server do
ch.basic_publish "foobar", "amq.fanout"
2.times { done.receive }
end
ensure
s.stop_accepting_clients
end
end

it "supports publishing large messages" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |_server, proxy_url|
Fiber.yield
AMQP::Client.start("amqp://localhost:5673") do |conn|
AMQP::Client.start(proxy_url) do |conn|
ch = conn.channel
q = ch.queue
q.publish_confirm Bytes.new(10240)
msg = q.get.not_nil!("should not be nil")
msg.body_io.bytesize.should eq 10240
end
ensure
s.stop_accepting_clients
end
end

it "supports publishing large messages when frame_max is small" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false)
begin
spawn { s.listen("127.0.0.1", 5673) }
with_server do |_server, proxy_url|
Fiber.yield
AMQP::Client.start("amqp://localhost:5673?frame_max=4096") do |conn|
AMQP::Client.start("#{proxy_url}?frame_max=4096") do |conn|
ch = conn.channel
q = ch.queue
q.publish_confirm Bytes.new(200_000)
msg = q.get.not_nil!("should not be nil")
msg.body_io.bytesize.should eq 200_000
end
ensure
s.stop_accepting_clients
end
end
end
12 changes: 12 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,15 @@ require "../src/amqproxy/version"
require "amqp-client"

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

def with_server(idle_connection_timeout = 5, &)
server = AMQProxy::Server.new("127.0.0.1", 5672, false)
tcp_server = TCPServer.new("127.0.0.1", 0)
amqp_url = "amqp://#{tcp_server.local_address}"
spawn { server.listen(tcp_server) }
yield server, amqp_url
ensure
if s = server
s.stop_accepting_clients
end
end
5 changes: 4 additions & 1 deletion src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ module AMQProxy
end

def listen(address, port)
@server = server = TCPServer.new(address, port)
listen(TCPServer.new(address, port))
end

def listen(@server : TCPServer)
Log.info { "Proxy listening on #{server.local_address}" }
while socket = server.accept?
begin
Expand Down

0 comments on commit 918e6e2

Please sign in to comment.