Skip to content

Commit

Permalink
timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
zvkemp committed Sep 23, 2017
1 parent a9ba655 commit be5ea2a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
22 changes: 18 additions & 4 deletions lib/phoenix/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@ def initialize(topic, join_options: {}, path: 'ws://localhost:4000/socket/websoc
@inbox_cond = new_cond
@thread_ready = new_cond
@topic_cond = new_cond
@join_ref = SecureRandom.uuid
reset_state_conditions
end

# Simulate a synchronous call over the websocket
# TODO: use a queue/inbox/outbox here instead
def request_reply(event:, payload: {})
def request_reply(event:, payload: {}, timeout: 5) # timeout in seconds
ref = SecureRandom.uuid
synchronize do
ensure_connection
@topic_cond.wait_until { @topic_joined }
EM.next_tick { socket.send({ topic: topic, event: event, payload: payload, ref: ref }.to_json) }
log [event, ref].inspect
inbox_cond.wait_until { inbox.key?(ref) || @dead }
log [event, ref]

# Ruby's condition variables only support timeout on the basic 'wait' method;
# This should behave roughly as if wait_until also support a timeout:
# `inbox_cond.wait_until(timeout) { inbox.key?(ref) || @dead }
#
# Note that this serves only to unblock the main thread, and should not halt execution of the
# socket connection. Therefore, there is a possibility that the inbox may pile up with
# unread messages if a lot of timeouts are encountered. A self-sweeping inbox will
# be implemented to prevent this.
ts = Time.now
loop do
inbox_cond.wait(timeout) # waits until time expires or signaled
break if inbox.key?(ref) || @dead
raise 'timeout' if timeout && Time.now > (ts + timeout)
end
inbox.delete(ref) { raise "reply #{ref} not found" }
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/phoenix/socket/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Rb
module Phoenix
module Socket
VERSION = "0.1.1"
VERSION = "0.2.0"
end
end
end
28 changes: 25 additions & 3 deletions spec/phoenix/socket_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
it 'handles concurrent threads' do
# NOTE: This is a proof of concept, and is WAY more than anyone would ever want/need
# to spawn in a runtime process. I.e. don't do this. If one at a time isn't enough,
# do it in Elixir.
# do it in Elixir. Although you should probably also ask yourself why you need 500 processes
# to share a single websocket.
responses = (0..500).map do |n|
Thread.new do
Thread.current[:id] = n
socket_handler.request_reply(event: :echo, payload: { n: n })
socket_handler.request_reply(event: :echo, payload: { n: n }, timeout: nil)
end
end.map(&:value)

Expand All @@ -39,9 +40,30 @@
# behavior that came up during development. Generally came up at least 1 / 5 times;
# running 20 for safety.
it "handles termination but respawns the connection handler" do
expect { socket_handler.request_reply(event: :unsupported) }.to raise_error(RuntimeError)
expect { socket_handler.request_reply(event: :unsupported) }.to raise_error(RuntimeError, /reply .* not found/)
expect(socket_handler.request_reply(event: :echo)['payload']['status']).to eq('ok')

# Ensure dead handler threads have been cleaned up; we should have at most
# the live main thread and a live respawned handler
expect(Thread.list.count).to be < 3
end
end
end

describe 'timeout handling' do
specify 'small sleep' do
response = socket_handler.request_reply(event: :sleep, payload: { ms: 50 })
expect(response.dig('payload', 'status')).to eq('ok')
end

specify 'long sleep' do
response = socket_handler.request_reply(event: :sleep, payload: { ms: 1000 })
expect(response.dig('payload', 'status')).to eq('ok')
end

specify 'sleep exceeding timeout' do
expect { socket_handler.request_reply(timeout: 0.5, event: :sleep, payload: { ms: 1000 }) }.to raise_error(RuntimeError, /timeout/)
expect { socket_handler.request_reply(timeout: 0.5, event: :sleep, payload: { ms: 10 }) }.not_to raise_error(RuntimeError, /timeout/)
end
end
end
6 changes: 6 additions & 0 deletions spec_example/lib/spec_example_web/channels/rspec_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ defmodule SpecExampleWeb.RSpecChannel do
{:reply, {:ok, payload}, socket}
end

# Simulate a long-running request
def handle_in("sleep", %{ "ms" => ms } = payload, socket) do
Process.sleep(ms)
{:reply, {:ok, payload}, socket}
end

def handle_in(_, _, socket) do
{:stop, :shutdown, socket}
end
Expand Down

0 comments on commit be5ea2a

Please sign in to comment.