Skip to content

Commit

Permalink
[FIX] Fix exit condition on psubscribe and subscribe, close socket un…
Browse files Browse the repository at this point in the history
…der the right conditions
  • Loading branch information
captchanjack committed Aug 24, 2022
1 parent 9421a30 commit ee1ba05
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jedis"
uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f"
authors = ["Jack Chan <[email protected]>"]
version = "0.3.1"
version = "0.3.2"

[deps]
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Expand Down
16 changes: 12 additions & 4 deletions src/pubsub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,14 @@ function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -
execute(["SUBSCRIBE", client.subscriptions...], client)
@lock client.lock set_subscribed!(client)
yield()
err = nothing

try
while true
type, chnl = msg = recv(client.socket)

msg = recv(client.socket)
isnothing(msg) && throw(_UVError("readline", UV_ECONNABORTED))
type, chnl = msg

if type == "message" && chnl in client.subscriptions
fn(msg)
stop_fn(msg) && break
Expand All @@ -105,6 +109,7 @@ function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -
@lock client.lock flush!(client)
end
@lock client.lock set_unsubscribed!(client)
@lock client.lock err isa Base.IOError || reconnect!(client)
end
end

Expand Down Expand Up @@ -188,11 +193,13 @@ function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg)
@lock client.lock client.psubscriptions = Set([pattern, patterns...])
execute(["PSUBSCRIBE", client.psubscriptions...], client)
@lock client.lock set_subscribed!(client)

yield()
err = nothing

try
while true
msg = recv(client.socket)
isnothing(msg) && continue
isnothing(msg) && throw(_UVError("readline", UV_ECONNABORTED))
type, pttrn = msg

if type == "pmessage" && pttrn in client.psubscriptions
Expand All @@ -218,6 +225,7 @@ function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg)
@lock client.lock flush!(client)
end
@lock client.lock set_unsubscribed!(client)
@lock client.lock err isa Base.IOError || reconnect!(client)
end
end

Expand Down
26 changes: 17 additions & 9 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ seconds for each keep-alive packet, ignored when `enable` is `0`. After delay ha
connection is still lost at the end of this procedure, then the handle is destroyed with a
UV_ETIMEDOUT error passed to the corresponding callback.
"""
keepalive!(socket::TCPSocket, enable::Cint, delay::Cint) = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), socket.handle, enable, delay)
keepalive!(socket::TCPSocket, enable::Cint, delay::Cint) = ccall(:uv_tcp_keepalive, Cint, (Ptr{Cvoid}, Cint, Cuint), socket.handle, enable, delay)

"""
isactive(socket::TCPSocket)
Expand Down Expand Up @@ -90,14 +90,23 @@ function peerhostport(socket::TCPSocket)
end

"""
tryclose(socket::TCPSocket)
tryclose(socket::TCPSocket, uv_code::Int, nbytes::Int)
Closes a TCP socket connection if the TCP state is `CLOSE_WAIT`.
Closes a TCP socket connection if all of the following conditions are satisfied:
- UV error code is < 0
- bytesavailable for read in buffer == 0
- socket is not active
- TCP state is `CLOSE_WAIT` or `CLOSED`
"""
function tryclose(socket::TCPSocket)
if isactive(socket) == 0 tcpstate(socket) == "CLOSE_WAIT"
function tryclose(socket::TCPSocket, uv_code::Int, nbytes::Int)
if uv_code >= 0 || nbytes != 0 || isactive(socket) == 1
return
end
state = tcpstate(socket)
if state == "CLOSE_WAIT" || state == "CLOSED"
close(socket)
end
return
end

"""
Expand Down Expand Up @@ -129,7 +138,7 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
elseif nread == UV_ETIMEDOUT
# elseif nread == UV_ETIMEDOUT
# TODO: put keepalive timeout callback here
else
stream.readerror = _UVError("read", nread)
Expand Down Expand Up @@ -161,10 +170,9 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})


# Line added to close socket when server connection breaks
# Closes the connection if the TCP state is CLOSE_WAIT
# Closes the connection if the TCP state is CLOSE_WAIT or CLOSED
# Must be called asynchronously
# TODO: Differentiate UV_ENOBUFS event with server close event
# @async tryclose(stream)
@async tryclose(stream, nread, bytesavailable(stream.buffer))
end
nothing
end
Expand Down
18 changes: 18 additions & 0 deletions test/test_pubsub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ end
wait_until_unsubscribed(subscriber)
@test subscriber.is_subscribed == false
@test isempty(subscriber.subscriptions)

task = @async psubscribe(patterns...; client=subscriber) do msg
push!(messages, msg)
end

wait_until_subscribed(subscriber)
disconnect!(subscriber) # force close

@test subscriber.is_subscribed == false
@test isempty(subscriber.subscriptions)
@test istaskdone(task)
try
fetch(task)
catch err
@test err isa TaskFailedException
@test err.task.result isa Base.IOError
@test err.task.result.code == Base.UV_ECONNABORTED
end
end

flushall()

2 comments on commit ee1ba05

@captchanjack
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request updated: JuliaRegistries/General/67001

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.3.2 -m "<description of version>" ee1ba0580a518d8a05a416de99dde784d03a37fa
git push origin v0.3.2

Please sign in to comment.