Skip to content

Commit

Permalink
Merge pull request #1 from zvkemp/better_threadsafety
Browse files Browse the repository at this point in the history
better threadsafety
  • Loading branch information
zvkemp authored Sep 23, 2017
2 parents 64021bc + 1589800 commit 1fbe493
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 13 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### 0.1.1

Provide better threadsafety. This now works correctly:

```ruby
require 'phoenix/socket'
ps = Phoenix::Socket.new('stat:default')
(1..100).map do |x|
Thread.new do
ps.request_reply(event: 'word_count', payload: { user_id: x })
end
end.map(&:value)
```

- EventMachine thread spawning is now limited to a single concurrent instance
- Requests wait until the channel has received join confirmation (no more 'Unknown Topic' errors)
- Certain conditions are now broadcasts instead of signals (prevent threads from sleeping forever when their replies come out-of-order)
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in rb-phoenix-socket.gemspec
gemspec

gem 'pry-byebug'
43 changes: 32 additions & 11 deletions lib/phoenix/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ def initialize(topic, join_options: {}, path: 'ws://localhost:4000/socket/websoc
super() # MonitorMixin
@inbox_cond = new_cond
@thread_ready = new_cond
@topic_cond = new_cond
@join_ref = SecureRandom.uuid
end

# Simulate a synchronous call over the websocket
def request_reply(event:, payload: {})
ref = SecureRandom.uuid
ensure_thread
EM.next_tick { socket.send({ topic: topic, event: event, payload: payload, ref: ref }.to_json) }
synchronize do
@topic_cond.wait_until { @topic_joined }
EM.next_tick { socket.send({ topic: topic, event: event, payload: payload, ref: ref }.to_json) }
inbox_cond.wait_until { inbox.key?(ref) || @dead }
inbox.delete(ref) or raise "reply #{ref} not found"
inbox.delete(ref) { raise "reply #{ref} not found" }
end
end

Expand All @@ -34,38 +37,56 @@ def request_reply(event:, payload: {})
attr_reader :inbox_cond, :thread_ready

def ensure_thread
@ws_thread&.alive? or synchronize do
connection_alive? or synchronize do
spawn_thread
thread_ready.wait(3)
raise 'dead connection timeout' if @dead
if @dead
@spawning = false
raise 'dead connection timeout'
end
end
end

def connection_alive?
@ws_thread&.alive? && !@dead
end

def spawn_thread
@dead = false
return if @spawning
puts 'spawn_thread'
@spawning = true
@ws_thread = Thread.new do
EM.run do
synchronize do
@socket = Faye::WebSocket::Client.new(path)
socket.on :open do |event|
p [:open]
socket.send({ topic: topic, event: "phx_join", payload: join_options, ref: 1 }.to_json)
synchronize { thread_ready.signal }
socket.send({ topic: topic, event: "phx_join", payload: join_options, ref: @join_ref }.to_json)
synchronize do
@dead = false
@spawning = false
thread_ready.broadcast
end
end

socket.on :message do |event|
data = JSON.parse(event.data)
synchronize do
inbox[data['ref']] = data
inbox_cond.signal
if data['ref'] == @join_ref
@topic_joined = true
@topic_cond.broadcast
else
inbox[data['ref']] = data
inbox_cond.broadcast
end
end
end

socket.on :close do |event|
p [:close, event.code, event.reason]
@socket = nil
@dead = true
synchronize do
@socket = nil
@dead = true
inbox_cond.signal
thread_ready.signal
end
Expand Down
2 changes: 1 addition & 1 deletion lib/phoenix/socket/version.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Rb
module Phoenix
module Socket
VERSION = "0.1.0"
VERSION = "0.1.1"
end
end
end
1 change: 0 additions & 1 deletion rb-phoenix-socket.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "bundler", "~> 1.14"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rspec", "~> 3.0"
spec.add_development_dependency "pry-byebug"
end

0 comments on commit 1fbe493

Please sign in to comment.