Skip to content

Commit

Permalink
[FEAT] Add retry logic for closed client socket status
Browse files Browse the repository at this point in the history
  • Loading branch information
captchanjack committed Jan 29, 2022
1 parent d528cef commit da584bd
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jedis"
uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f"
authors = ["Jack Chan <[email protected]>"]
version = "0.2.6"
version = "0.2.7"

[deps]
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Expand Down
107 changes: 100 additions & 7 deletions src/client.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""
Client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing]) -> Client
Client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x]) -> Client
Creates a Client instance connecting and authenticating to a Redis host, provide an `MbedTLS.SSLConfig`
(see `get_ssl_config`) for a secured Redis connection (SSL/TLS).
# Attributes
# Fields
- `host::AbstractString`: Redis host.
- `port::Integer`: Redis port.
- `database::Integer`: Redis database index.
Expand All @@ -16,6 +16,9 @@ Creates a Client instance connecting and authenticating to a Redis host, provide
- `is_subscribed::Bool`: Whether this Client is actively subscribed to any channels or patterns.
- `subscriptions::AbstractSet{<:AbstractString}`: Set of channels currently subscribed on.
- `psubscriptions::AbstractSet{<:AbstractString}`: Set of patterns currently psubscribed on.
- `retry_when_closed::Bool`: Set `true` to try and reconnect when client socket status is closed, defaults to `true`.
- `retry_max_attempts`::Int`: Maximum number of retries for reconnection, defaults to `1`.
- `retry_backoff::Function`: Retry backoff function, called after each retry and must return a single number, where that number is the sleep time (in seconds) until the next retry, accepts a single argument, the number of retries attempted.
# Note
- Connection parameters `host`, `port`, `database`, `password`, `username` will not change after
Expand Down Expand Up @@ -55,9 +58,12 @@ mutable struct Client
is_subscribed::Bool
subscriptions::AbstractSet{<:AbstractString}
psubscriptions::AbstractSet{<:AbstractString}
retry_when_closed::Bool
retry_max_attempts::Int
retry_backoff::Function
end

function Client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing)
function Client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x)
if isnothing(ssl_config)
socket = connect(host, port)
else
Expand All @@ -75,9 +81,17 @@ function Client(; host="127.0.0.1", port=6379, database=0, password="", username
ssl_config,
false,
Set{String}(),
Set{String}()
Set{String}(),
retry_when_closed,
retry_max_attemps,
retry_backoff
)

# Ping server to test connection and set socket status to Base.StatusPaused (i.e. a ready state)
# Raw execution is used to bypass locks and retries
write(client.socket, resp(["PING"]))
recv(client.socket)

!isempty(password * username) && auth(password, username; client=client)
database != 0 && select(database; client=client)

Expand Down Expand Up @@ -135,16 +149,16 @@ const GLOBAL_CLIENT = Ref{Client}()

"""
set_global_client(client::Client)
set_global_client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing])
set_global_client([; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x])
Sets a Client object as the `GLOBAL_CLIENT[]` instance.
"""
function set_global_client(client::Client)
GLOBAL_CLIENT[] = client
end

function set_global_client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing)
client = Client(; host=host, port=port, database=database, password=password, username=username, ssl_config=ssl_config)
function set_global_client(; host="127.0.0.1", port=6379, database=0, password="", username="", ssl_config=nothing, retry_when_closed=true, retry_max_attemps=1, retry_backoff=(x) -> 2^x)
client = Client(; host=host, port=port, database=database, password=password, username=username, ssl_config=ssl_config, retry_when_closed=retry_when_closed, retry_max_attemps=retry_max_attemps, retry_backoff=retry_backoff)
set_global_client(client)
end

Expand All @@ -162,6 +176,15 @@ function get_global_client()
end
end

"""
endpoint(client::Client)
Retrieves the endpoint (host:port) of the client.
"""
function endpoint(client::Client)
return "$(client.host):$(client.port)"
end

"""
copy(client::Client) -> Client
Expand Down Expand Up @@ -219,6 +242,76 @@ function flush!(client::Client)
end
end

"""
status(client::Client)
Returns the status of the client socket.
"""
function status(client::Client)
if client.socket isa TCPSocket
return client.socket.status
elseif client.socket isa MbedTLS.SSLContext
return client.socket.bio.status
else
throw(RedisError("INVALIDSOCKET", "Invalid socket type: $(typeof(client.socket))"))
end
end

"""
isclosed(client::Client)
Returns `true` if client socket status is `Base.StatusClosing`, `Base.StatusClosed` or
`Base.StatusOpen`, `false` otherwise. It turns out when status is `Base.StatusOpen` the socket
is already unusable. `Base.StatusPaused` is the true ready state.
"""
function isclosed(client::Client)
return status(client) == Base.StatusClosing || status(client) == Base.StatusClosed || status(client) == Base.StatusOpen
end

"""
retry!(client::Client)
Attempts to re-estiablish client socket connection, behaviour is determined by the retry parameters;
`retry_when_closed`, `retry_max_attempts`, `retry_backoff`.
"""
function retry!(client::Client)
if !isclosed(client)
return
end

if !client.retry_when_closed
throw(Base.IOError("Client connection to $(endpoint(client)) is closed or unusable, try establishing a new connection, or set `retry_when_closed` field to `true`", Base.StatusUninit))
end

@warn "Client socket is closed or unusable, retrying connection to $(endpoint(client))"
attempts = 0

while attempts < client.retry_max_attempts
attempts += 1
@info "Reconnection attempt #$attempts to $(endpoint(client))"

try
reconnect!(client)
@info "Reconnection attempt #$attempts to $(endpoint(client)) was successful"
return
catch err
if !(err isa Base.IOError)
rethrow()
end

@warn "Reconnection attempt #$attempts to $(endpoint(client)) was unsuccessful"
end

if attempts < client.retry_max_attempts
backoff = client.retry_backoff(attempts)
@info "Sleeping $(backoff)s until next reconnection attempt to $(endpoint(client))"
sleep(backoff)
end
end

throw(Base.IOError("Client connection to $(endpoint(client)) is closed or unusable, try establishing a new connection, or set `retry_when_closed` field to `true`", Base.StatusUninit))
end

"""
set_subscribed!(client::Client)
Expand Down
2 changes: 2 additions & 0 deletions src/execute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ function execute(command::AbstractArray, client::Client=get_global_client())

@lock client.lock begin
flush!(client)
retry!(client)
write(client.socket, resp(command))
msg = recv(client.socket)

Expand All @@ -41,6 +42,7 @@ Sends a RESP compliant command to the Redis host without reading the returned re
function execute_without_recv(command::AbstractArray, client::Client=get_global_client())
@lock client.lock begin
flush!(client)
retry!(client)
write(client.socket, resp(command))
return
end
Expand Down
2 changes: 1 addition & 1 deletion src/pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Creates a Pipeline client instance for executing commands in batch.
# Attributes
# Fields
- `client::Client`: Reference to the underlying Client connection.
- `resp::Vector{String}`: Batched commands converted to RESP compliant string.
- `filter_multi_exec::Bool`: Set `true` to filter out QUEUED responses in a MULTI/EXEC transaction.
Expand Down

2 comments on commit da584bd

@captchanjack
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/53424

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.2.7 -m "<description of version>" da584bd87f5f03e0f00f599894c85bb0e00e29ed
git push origin v0.2.7

Please sign in to comment.