Skip to content

Commit

Permalink
fix (Redis): spawn unblocking routine
Browse files Browse the repository at this point in the history
  • Loading branch information
vladfaust committed Apr 20, 2019
1 parent 093b446 commit 0d1441b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
1 change: 1 addition & 0 deletions spec/onyx-eda/channel/redis_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Onyx::EDA::Channel::Redis
buffer["c"] = event.payload
end

sleep(0.1)
channel.emit(TestEvent::A.new("foo"), TestEvent::B.new(42))
sleep(0.1)

Expand Down
23 changes: 18 additions & 5 deletions src/onyx-eda/channel/redis.cr
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ module Onyx::EDA
)
@client_id = @redis.send("CLIENT", "ID").raw.as(Int64)
spawn routine
spawn unblocking_routine
end

# Emit *events*, sending them to an appropriate stream. See `Channel#emit`.
Expand Down Expand Up @@ -231,7 +232,7 @@ module Onyx::EDA
before = (@subscriptions.keys + @consumers.keys).uniq!

yield.tap do
unblock_client if before != (@subscriptions.keys + @consumers.keys).uniq!
want_unblock if before != (@subscriptions.keys + @consumers.keys).uniq!
end
end

Expand Down Expand Up @@ -316,11 +317,23 @@ module Onyx::EDA
end
end

@unblock_channel = ::Channel(Nil).new(1)

protected def unblocking_routine
loop do
@unblock_channel.receive

if @blocked
@sidekick.send("CLIENT", "UNBLOCK", @client_id, "ERROR")
@blocked = false
end
end
end

# Unblock the subscribed client.
protected def unblock_client
if @blocked
@sidekick.send("CLIENT", "UNBLOCK", @client_id, "ERROR")
@blocked = false
protected def want_unblock
spawn do
@unblock_channel.send(nil) unless @unblock_channel.full?
end
end
end
Expand Down

0 comments on commit 0d1441b

Please sign in to comment.