Skip to content

Commit

Permalink
[FEAT] Add toggle for tcp socket keep-alive, handle server connection…
Browse files Browse the repository at this point in the history
… closure properly
  • Loading branch information
captchanjack committed Aug 22, 2022
1 parent d4c8566 commit 07cb0e0
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 44 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jedis"
uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f"
authors = ["Jack Chan <[email protected]>"]
version = "0.2.15"
version = "0.3.0"

[deps]
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Expand Down
1 change: 1 addition & 0 deletions docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Jedis.GLOBAL_CLIENT
set_global_client
get_global_client
Jedis.copy
isclosed
disconnect!
reconnect!
wait_until_subscribed
Expand Down
3 changes: 2 additions & 1 deletion src/Jedis.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export Client, Pipeline, RedisError, get_global_client, set_global_client, get_s
expire, ttl, multi, exec, multi_exec, pipeline, hset, hget, hgetall, hmget, hdel, rpush,
lpush, lpos, lrem, lpop, rpop, blpop, brpop, llen, lrange, publish, subscribe, unsubscribe,
psubscribe, punsubscribe, incr, incrby, incrbyfloat, hincrby, hincrbyfloat, zincrby, zadd,
zrange, zrangebyscore, zrem, acquire_lock, release_lock, redis_lock, isredislocked
zrange, zrangebyscore, zrem, acquire_lock, release_lock, redis_lock, isredislocked, isclosed

using Sockets
using MbedTLS
Expand All @@ -16,6 +16,7 @@ import Base: copy, showerror, get, pipeline

include("exceptions.jl")
include("utilities.jl")
include("stream.jl")
include("client.jl")
include("pipeline.jl")
include("protocol.jl")
Expand Down
71 changes: 41 additions & 30 deletions src/client.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x]) -> Client
Client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x, keepalive_enable=false, keepalive_delay=60]) -> Client
Creates a Client instance connecting and authenticating to a Redis host, provide an `MbedTLS.SSLConfig`
(see `get_ssl_config`) for a secured Redis connection (SSL/TLS).
Expand All @@ -19,6 +19,8 @@ Creates a Client instance connecting and authenticating to a Redis host, provide
- `retry_when_closed::Bool`: Set `true` to try and reconnect when client socket status is closed, defaults to `true`.
- `retry_max_attempts`::Int`: Maximum number of retries for reconnection, defaults to `1`.
- `retry_backoff::Function`: Retry backoff function, called after each retry and must return a single number, where that number is the sleep time (in seconds) until the next retry, accepts a single argument, the number of retries attempted.
- `keepalive_enable::Bool=false`: Set `true` to enable TCP keep-alive.
- `keepalive_delay::Int=60`: Initial delay in seconds, defaults to 60s, ignored when `keepalive_enable` is `false`. After delay has been reached, 10 successive probes, each spaced 1 second from the previous one, will still happen. If the connection is still lost at the end of this procedure, then the handle is destroyed with a UV_ETIMEDOUT error passed to the corresponding callback.
# Note
- Connection parameters `host`, `port`, `database`, `password`, `username` will not change after
Expand Down Expand Up @@ -61,41 +63,54 @@ mutable struct Client
retry_when_closed::Bool
retry_max_attempts::Int
retry_backoff::Function
keepalive_enable::Bool
keepalive_delay::Int
end

function Client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x)
if isnothing(ssl_config)
socket = connect(host, port)
else
socket = ssl_connect(host, port, ssl_config)
end

function Client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x, keepalive_enable=false, keepalive_delay=60)
client = Client(
host,
port,
database,
password,
username,
socket,
isnothing(ssl_config) ? connect(host, port) : ssl_connect(host, port, ssl_config),
ReentrantLock(),
ssl_config,
false,
Set{String}(),
Set{String}(),
retry_when_closed,
retry_max_attemps,
retry_backoff
retry_backoff,
keepalive_enable,
keepalive_delay
)

# Ping server to test connection and set socket status to Base.StatusPaused (i.e. a ready state)
# Raw execution is used to bypass locks and retries
prepare(client)
return client
end

"""
prepare(client::Client)
Prepares a new client, involves pre-pinging the server, logging in with the correct username
and password, selecting the chosen database, and setting keepalive if applicable.
Pinging the server is to test connection and set socket status to Base.StatusPaused (i.e. a ready state).
Raw execution is used to bypass locks and retries
"""
function prepare(client::Client)
write(client.socket, resp(["PING"]))
recv(client.socket)

!isempty(client.password * client.username) && auth(client.password, client.username; client=client)
client.database != 0 && select(client.database; client=client)

!isempty(password * username) && auth(password, username; client=client)
database != 0 && select(database; client=client)
client.keepalive_enable && keepalive!(client.socket, Cint(1), Cint(client.keepalive_delay))

return client
# Async garbage collect is needed to clear any stale clients
@async GC.gc()
end

"""
Expand Down Expand Up @@ -149,16 +164,16 @@ const GLOBAL_CLIENT = Ref{Client}()

"""
set_global_client(client::Client)
set_global_client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x])
set_global_client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x, keepalive_enable=false, keepalive_delay=60])
Sets a Client object as the `GLOBAL_CLIENT[]` instance.
"""
function set_global_client(client::Client)
GLOBAL_CLIENT[] = client
end

function set_global_client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x)
client = Client(; host=host, port=port, database=database, password=password, username=username, ssl_config=ssl_config, retry_when_closed=retry_when_closed, retry_max_attemps=retry_max_attemps, retry_backoff=retry_backoff)
function set_global_client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x, keepalive_enable=false, keepalive_delay=60)
client = Client(; host=host, port=port, database=database, password=password, username=username, ssl_config=ssl_config, retry_when_closed=retry_when_closed, retry_max_attemps=retry_max_attemps, retry_backoff=retry_backoff, keepalive_enable=keepalive_enable, keepalive_delay=keepalive_delay)
set_global_client(client)
end

Expand Down Expand Up @@ -197,7 +212,12 @@ function Base.copy(client::Client)
database=client.database,
password=client.password,
username=client.username,
ssl_config=client.ssl_config
ssl_config=client.ssl_config,
retry_when_closed=client.retry_when_closed,
retry_max_attempts=client.retry_max_attempts,
retry_backoff=client.retry_backoff,
keepalive_enable=client.keepalive_enable,
keepalive_delay=client.keepalive_delay
)
end

Expand All @@ -217,17 +237,8 @@ Reconnects the input client socket connection.
"""
function reconnect!(client::Client)
disconnect!(client)

if isnothing(client.ssl_config)
new_socket = connect(client.host, client.port)
else
new_socket = ssl_connect(connect(client.host, client.port), client.host, client.ssl_config)
end

client.socket = new_socket
!isempty(client.password * client.username) && auth(client.password, client.username; client=client)
client.database != 0 && select(client.database; client=client)

client.socket = isnothing(client.ssl_config) ? connect(client.host, client.port) : ssl_connect(connect(client.host, client.port), client.host, client.ssl_config)
prepare(client)
return client
end

Expand Down
2 changes: 1 addition & 1 deletion src/protocol.jl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ end
Reads any bytes before next CRLF (\r\n) in a TCPScoket or IOBuffer, blocks if no bytes available.
"""
function recv(io::Union{TCPSocket,Base.GenericIOBuffer})
line = readline(io)
line = _readline(io)

if isempty(line)
return nothing
Expand Down
12 changes: 6 additions & 6 deletions src/pubsub.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ publish(channel, message; client=get_global_client()) = execute(["PUBLISH", chan
channel,
channels...;
stop_fn::Function=(msg) -> false,
err_cb::Function=(err) -> log_error(err))
err_cb::Function=(err) -> rethrow(err))
Listen for messages published to the given channels in a do block. Optionally provide a stop
function `stop_fn(msg)` which gets run as a callback everytime a subscription message is received,
Expand Down Expand Up @@ -69,7 +69,7 @@ julia> subscriber.subscriptions
Set{String}()
```
"""
function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> log_error(err), client::Client=get_global_client())
function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> rethrow(err), client::Client=get_global_client())
if client.is_subscribed
throw(RedisError("SUBERROR", "Cannot open multiple subscriptions in the same Client instance"))
end
Expand Down Expand Up @@ -100,7 +100,7 @@ function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -
err_cb(err)
finally
if !isempty(client.subscriptions)
unsubscribe(client.subscriptions...; client=client)
isclosed(client) || unsubscribe(client.subscriptions...; client=client)
@lock client.lock client.subscriptions = Set{String}()
@lock client.lock flush!(client)
end
Expand All @@ -120,7 +120,7 @@ unsubscribe(channels...; client=get_global_client()) = execute_without_recv(["UN
pattern,
patterns...;
stop_fn::Function=(msg) -> false,
err_cb::Function=(err) -> log_error(err))
err_cb::Function=(err) -> rethrow(err))
Listen for messages published to the given channels matching ghe given patterns in a do block.
Optionally provide a stop function `stop_fn(msg)` which gets run as a callback everytime a
Expand Down Expand Up @@ -180,7 +180,7 @@ julia> subscriber.psubscriptions
Set{String}()
```
"""
function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> log_error(err), client::Client=get_global_client())
function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> rethrow(err), client::Client=get_global_client())
if client.is_subscribed
throw(RedisError("SUBERROR", "Cannot open multiple subscriptions in the same Client instance"))
end
Expand Down Expand Up @@ -213,7 +213,7 @@ function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg)
err_cb(err)
finally
if !isempty(client.psubscriptions)
punsubscribe(client.psubscriptions...; client=client)
isclosed(client) || punsubscribe(client.psubscriptions...; client=client)
@lock client.lock client.psubscriptions = Set{String}()
@lock client.lock flush!(client)
end
Expand Down
Loading

2 comments on commit 07cb0e0

@captchanjack
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/66766

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.3.0 -m "<description of version>" 07cb0e093e539b8cec696d56e9ff9e64d2ca5829
git push origin v0.3.0

Please sign in to comment.