diff --git a/Project.toml b/Project.toml index e97aa38..577f360 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Jedis" uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f" authors = ["Jack Chan "] -version = "0.3.1" +version = "0.3.2" [deps] MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d" diff --git a/src/pubsub.jl b/src/pubsub.jl index 9b5754d..60e9ad8 100644 --- a/src/pubsub.jl +++ b/src/pubsub.jl @@ -78,10 +78,14 @@ function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) - execute(["SUBSCRIBE", client.subscriptions...], client) @lock client.lock set_subscribed!(client) yield() + err = nothing + try while true - type, chnl = msg = recv(client.socket) - + msg = recv(client.socket) + isnothing(msg) && throw(_UVError("readline", UV_ECONNABORTED)) + type, chnl = msg + if type == "message" && chnl in client.subscriptions fn(msg) stop_fn(msg) && break @@ -105,6 +109,7 @@ function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) - @lock client.lock flush!(client) end @lock client.lock set_unsubscribed!(client) + @lock client.lock err isa Base.IOError || reconnect!(client) end end @@ -188,11 +193,13 @@ function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg) @lock client.lock client.psubscriptions = Set([pattern, patterns...]) execute(["PSUBSCRIBE", client.psubscriptions...], client) @lock client.lock set_subscribed!(client) - + yield() + err = nothing + try while true msg = recv(client.socket) - isnothing(msg) && continue + isnothing(msg) && throw(_UVError("readline", UV_ECONNABORTED)) type, pttrn = msg if type == "pmessage" && pttrn in client.psubscriptions @@ -218,6 +225,7 @@ function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg) @lock client.lock flush!(client) end @lock client.lock set_unsubscribed!(client) + @lock client.lock err isa Base.IOError || reconnect!(client) end end diff --git a/src/stream.jl b/src/stream.jl index 68fc4c9..2ab0ef5 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -12,7 +12,7 @@ seconds for each keep-alive packet, ignored when `enable` is `0`. After delay ha connection is still lost at the end of this procedure, then the handle is destroyed with a UV_ETIMEDOUT error passed to the corresponding callback. """ -keepalive!(socket::TCPSocket, enable::Cint, delay::Cint) = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), socket.handle, enable, delay) +keepalive!(socket::TCPSocket, enable::Cint, delay::Cint) = ccall(:uv_tcp_keepalive, Cint, (Ptr{Cvoid}, Cint, Cuint), socket.handle, enable, delay) """ isactive(socket::TCPSocket) @@ -90,14 +90,23 @@ function peerhostport(socket::TCPSocket) end """ - tryclose(socket::TCPSocket) + tryclose(socket::TCPSocket, uv_code::Int, nbytes::Int) -Closes a TCP socket connection if the TCP state is `CLOSE_WAIT`. +Closes a TCP socket connection if all of the following conditions are satisfied: +- UV error code is < 0 +- bytesavailable for read in buffer == 0 +- socket is not active +- TCP state is `CLOSE_WAIT` or `CLOSED` """ -function tryclose(socket::TCPSocket) - if isactive(socket) == 0 tcpstate(socket) == "CLOSE_WAIT" +function tryclose(socket::TCPSocket, uv_code::Int, nbytes::Int) + if uv_code >= 0 || nbytes != 0 || isactive(socket) == 1 + return + end + state = tcpstate(socket) + if state == "CLOSE_WAIT" || state == "CLOSED" close(socket) end + return end """ @@ -129,7 +138,7 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle) stream.status = StatusClosing end - elseif nread == UV_ETIMEDOUT + # elseif nread == UV_ETIMEDOUT # TODO: put keepalive timeout callback here else stream.readerror = _UVError("read", nread) @@ -161,10 +170,9 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) # Line added to close socket when server connection breaks - # Closes the connection if the TCP state is CLOSE_WAIT + # Closes the connection if the TCP state is CLOSE_WAIT or CLOSED # Must be called asynchronously - # TODO: Differentiate UV_ENOBUFS event with server close event - # @async tryclose(stream) + @async tryclose(stream, nread, bytesavailable(stream.buffer)) end nothing end diff --git a/test/test_pubsub.jl b/test/test_pubsub.jl index 6cacf95..13ea48d 100644 --- a/test/test_pubsub.jl +++ b/test/test_pubsub.jl @@ -112,6 +112,24 @@ end wait_until_unsubscribed(subscriber) @test subscriber.is_subscribed == false @test isempty(subscriber.subscriptions) + + task = @async psubscribe(patterns...; client=subscriber) do msg + push!(messages, msg) + end + + wait_until_subscribed(subscriber) + disconnect!(subscriber) # force close + + @test subscriber.is_subscribed == false + @test isempty(subscriber.subscriptions) + @test istaskdone(task) + try + fetch(task) + catch err + @test err isa TaskFailedException + @test err.task.result isa Base.IOError + @test err.task.result.code == Base.UV_ECONNABORTED + end end flushall() \ No newline at end of file