Skip to content

Commit

Permalink
[FIX] Remove premature call on socket close
Browse files Browse the repository at this point in the history
  • Loading branch information
captchanjack committed Aug 24, 2022
1 parent 07cb0e0 commit 9421a30
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jedis"
uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f"
authors = ["Jack Chan <[email protected]>"]
version = "0.3.0"
version = "0.3.1"

[deps]
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Expand Down
8 changes: 4 additions & 4 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ function Client(; host="127.0.0.1", port=6379, database=0, password="", username
keepalive_delay
)

prepare(client)
prepare!(client)
return client
end

"""
prepare(client::Client)
prepare!(client::Client)
Prepares a new client, involves pre-pinging the server, logging in with the correct username
and password, selecting the chosen database, and setting keepalive if applicable.
Pinging the server is to test connection and set socket status to Base.StatusPaused (i.e. a ready state).
Raw execution is used to bypass locks and retries
"""
function prepare(client::Client)
function prepare!(client::Client)
write(client.socket, resp(["PING"]))
recv(client.socket)

Expand Down Expand Up @@ -238,7 +238,7 @@ Reconnects the input client socket connection.
function reconnect!(client::Client)
disconnect!(client)
client.socket = isnothing(client.ssl_config) ? connect(client.host, client.port) : ssl_connect(connect(client.host, client.port), client.host, client.ssl_config)
prepare(client)
prepare!(client)
return client
end

Expand Down
84 changes: 82 additions & 2 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,85 @@ Returns non-zero if the handle is active, zero if it’s inactive. Is active whe
something that involves i/o, like reading, writing, connecting, accepting new connections, etc.
"""
isactive(socket::TCPSocket) = ccall(:uv_is_active, Cint, (Ptr{Cvoid},), socket.handle)


"""
netstat(port::Int)::Vector
Calls CLI tool `netstat` to get TCP connection statistics over a specific port.
Returns a vector os lines, each line represents (Proto, Recv-Q, Send-Q, Local Address, Foreign Address, state).
"""
function netstat(port::Int)::Vector
try
return [split(line) for line in readlines(pipeline(`netstat -na`, `grep $port`))]
catch
# Catches null pipeline
end
return
end

"""
tcpstate(src_host::String, src_port::Int, dst_host::String, dst_port::Int)
Returns the TCP state given source and destination host and ports.
"""
function tcpstate(src_host::String, src_port::Int, dst_host::String, dst_port::Int)
stats = netstat(src_port)
isempty(stats) && return
for (_, _, _, src_addr, dst_addr, state) in stats
if (
occursin(src_host, src_addr) &&
occursin(string(src_port), src_addr) &&
occursin(dst_host, dst_addr) &&
occursin(string(dst_port), dst_addr)
)
return state
end
end
return
end

"""
tcpstate(socket::TCPSocket)
Returns TCP state given a socket object.
"""
function tcpstate(socket::TCPSocket)
src_host, src_port = hostport(socket)
dst_host, dst_port = peerhostport(socket)
return tcpstate(src_host, src_port, dst_host, dst_port)
end

"""
hostport(socket::TCPSocket)
Returns host and port given a socket object.
"""
function hostport(socket::TCPSocket)
host, port = getsockname(socket)
return string(host), Int(port)
end

"""
hostport(socket::TCPSocket)
Returns host and port for a peer given a socket object.
"""
function peerhostport(socket::TCPSocket)
host, port = getpeername(socket)
return string(host), Int(port)
end

"""
tryclose(socket::TCPSocket)
Closes a TCP socket connection if the TCP state is `CLOSE_WAIT`.
"""
function tryclose(socket::TCPSocket)
if isactive(socket) == 0 tcpstate(socket) == "CLOSE_WAIT"
close(socket)
end
end

"""
_uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
Expand Down Expand Up @@ -83,8 +161,10 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})


# Line added to close socket when server connection breaks
# Closes the connection if the TCP state is CLOSE_WAIT
# Must be called asynchronously
@async begin isactive(stream) == 0 && close(stream) end
# TODO: Differentiate UV_ENOBUFS event with server close event
# @async tryclose(stream)
end
nothing
end
Expand Down

2 comments on commit 9421a30

@captchanjack
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/66922

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.3.1 -m "<description of version>" 9421a307eae3d1be995eb817f23c05b6a074081d
git push origin v0.3.1

Please sign in to comment.