diff --git a/Project.toml b/Project.toml index fff52af..e97aa38 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Jedis" uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f" authors = ["Jack Chan "] -version = "0.3.0" +version = "0.3.1" [deps] MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d" diff --git a/src/client.jl b/src/client.jl index 82f9f3b..423cd99 100644 --- a/src/client.jl +++ b/src/client.jl @@ -87,12 +87,12 @@ function Client(; host="127.0.0.1", port=6379, database=0, password="", username keepalive_delay ) - prepare(client) + prepare!(client) return client end """ - prepare(client::Client) + prepare!(client::Client) Prepares a new client, involves pre-pinging the server, logging in with the correct username and password, selecting the chosen database, and setting keepalive if applicable. @@ -100,7 +100,7 @@ and password, selecting the chosen database, and setting keepalive if applicable Pinging the server is to test connection and set socket status to Base.StatusPaused (i.e. a ready state). Raw execution is used to bypass locks and retries """ -function prepare(client::Client) +function prepare!(client::Client) write(client.socket, resp(["PING"])) recv(client.socket) @@ -238,7 +238,7 @@ Reconnects the input client socket connection. function reconnect!(client::Client) disconnect!(client) client.socket = isnothing(client.ssl_config) ? connect(client.host, client.port) : ssl_connect(connect(client.host, client.port), client.host, client.ssl_config) - prepare(client) + prepare!(client) return client end diff --git a/src/stream.jl b/src/stream.jl index 5adbdab..68fc4c9 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -21,7 +21,85 @@ Returns non-zero if the handle is active, zero if it’s inactive. Is active whe something that involves i/o, like reading, writing, connecting, accepting new connections, etc. """ isactive(socket::TCPSocket) = ccall(:uv_is_active, Cint, (Ptr{Cvoid},), socket.handle) - + +""" + netstat(port::Int)::Vector + +Calls CLI tool `netstat` to get TCP connection statistics over a specific port. +Returns a vector os lines, each line represents (Proto, Recv-Q, Send-Q, Local Address, Foreign Address, state). +""" +function netstat(port::Int)::Vector + try + return [split(line) for line in readlines(pipeline(`netstat -na`, `grep $port`))] + catch + # Catches null pipeline + end + return +end + +""" + tcpstate(src_host::String, src_port::Int, dst_host::String, dst_port::Int) + +Returns the TCP state given source and destination host and ports. +""" +function tcpstate(src_host::String, src_port::Int, dst_host::String, dst_port::Int) + stats = netstat(src_port) + isempty(stats) && return + for (_, _, _, src_addr, dst_addr, state) in stats + if ( + occursin(src_host, src_addr) && + occursin(string(src_port), src_addr) && + occursin(dst_host, dst_addr) && + occursin(string(dst_port), dst_addr) + ) + return state + end + end + return +end + +""" + tcpstate(socket::TCPSocket) + +Returns TCP state given a socket object. +""" +function tcpstate(socket::TCPSocket) + src_host, src_port = hostport(socket) + dst_host, dst_port = peerhostport(socket) + return tcpstate(src_host, src_port, dst_host, dst_port) +end + +""" + hostport(socket::TCPSocket) + +Returns host and port given a socket object. +""" +function hostport(socket::TCPSocket) + host, port = getsockname(socket) + return string(host), Int(port) +end + +""" + hostport(socket::TCPSocket) + +Returns host and port for a peer given a socket object. +""" +function peerhostport(socket::TCPSocket) + host, port = getpeername(socket) + return string(host), Int(port) +end + +""" + tryclose(socket::TCPSocket) + +Closes a TCP socket connection if the TCP state is `CLOSE_WAIT`. +""" +function tryclose(socket::TCPSocket) + if isactive(socket) == 0 tcpstate(socket) == "CLOSE_WAIT" + close(socket) + end +end + """ _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) @@ -83,8 +161,10 @@ function _uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid}) # Line added to close socket when server connection breaks + # Closes the connection if the TCP state is CLOSE_WAIT # Must be called asynchronously - @async begin isactive(stream) == 0 && close(stream) end + # TODO: Differentiate UV_ENOBUFS event with server close event + # @async tryclose(stream) end nothing end