diff --git a/.github/workflows/TagBot.yml b/.github/workflows/TagBot.yml new file mode 100644 index 0000000..4c3b99f --- /dev/null +++ b/.github/workflows/TagBot.yml @@ -0,0 +1,13 @@ +name: TagBot +on: + schedule: + - cron: 0 0 * * * + workflow_dispatch: +jobs: + TagBot: + runs-on: ubuntu-latest + steps: + - uses: JuliaRegistries/TagBot@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + ssh: ${{ secrets.DOCUMENTER_KEY }} \ No newline at end of file diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml new file mode 100644 index 0000000..717f01e --- /dev/null +++ b/.github/workflows/docs.yml @@ -0,0 +1,24 @@ +name: Documentation + +on: + push: + branches: + - master + tags: '*' + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@latest + with: + version: '1.5' + - name: Install dependencies + run: julia --project=docs/ -e 'using Pkg; Pkg.develop(PackageSpec(path=pwd())); Pkg.instantiate()' + - name: Build and deploy + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # For authentication with GitHub Actions token + DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }} # For authentication with SSH deploy key + run: julia --project=docs/ docs/make.jl \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..29126e4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Files generated by invoking Julia with --code-coverage +*.jl.cov +*.jl.*.cov + +# Files generated by invoking Julia with --track-allocation +*.jl.mem + +# System-specific files and directories generated by the BinaryProvider and BinDeps packages +# They contain absolute paths specific to the host computer, and so should not be committed +deps/deps.jl +deps/build.log +deps/downloads/ +deps/usr/ +deps/src/ + +# Build artifacts for creating documentation generated by the Documenter package +docs/build/ +docs/site/ + +# File generated by Pkg, the package manager, based on a corresponding Project.toml +# It records a fixed state of all packages used by the project. As such, it should not be +# committed for packages, but should be committed for applications that require a static +# environment. +Manifest.toml diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5f1af9e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 captchanjack + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..82b76c4 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +.PHONY: test +test: + @docker run --name redis -d -p 6379:6379 -e ALLOW_EMPTY_PASSWORD=yes bitnami/redis:6.2.3 + @julia -e "using Pkg; Pkg.test()" + @docker stop redis + @docker rm redis + @docker image rm bitnami/redis:6.2.3 \ No newline at end of file diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..e586a9f --- /dev/null +++ b/Project.toml @@ -0,0 +1,10 @@ +name = "Jedis" +uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f" +authors = ["Jack Chan "] +version = "0.1.0" + +[deps] +Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" + +[compat] +julia = "^1" diff --git a/README.md b/README.md new file mode 100644 index 0000000..35d5d9a --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +# Jedis.jl +A lightweight Redis client, implemented in Julia. + +## Key Features +This client supports: +- Basic **[command execution](https://captchanjack.github.io/Jedis.jl/commands/)** +- Executing commands with a **[global client](https://captchanjack.github.io/Jedis.jl/client/)** instance +- Executing commands atomically per client instance, with the help of socket locks +- **[Pipelining](https://captchanjack.github.io/Jedis.jl/pipeline/)** +- **[Transactions](https://captchanjack.github.io/Jedis.jl/commands/#Jedis.multi)** +- **[Pub/Sub](https://captchanjack.github.io/Jedis.jl/pubsub/)** \ No newline at end of file diff --git a/docs/.gitignore b/docs/.gitignore new file mode 100644 index 0000000..a303fff --- /dev/null +++ b/docs/.gitignore @@ -0,0 +1,2 @@ +build/ +site/ diff --git a/docs/Project.toml b/docs/Project.toml new file mode 100644 index 0000000..dfa65cd --- /dev/null +++ b/docs/Project.toml @@ -0,0 +1,2 @@ +[deps] +Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" diff --git a/docs/make.jl b/docs/make.jl new file mode 100644 index 0000000..680757d --- /dev/null +++ b/docs/make.jl @@ -0,0 +1,20 @@ +using Documenter +using Jedis + +makedocs( + sitename="Jedis.jl Documentation", + # format = Documenter.HTML(prettyurls = false), + pages=[ + "Home" => "index.md", + "Client" => "client.md", + "Commands" => "commands.md", + "Pipelining" => "pipeline.md", + "Pub/Sub" => "pubsub.md" + ], + modules=[Jedis] +) + +deploydocs( + repo="https://github.com/captchanjack/Jedis.jl.git", + devurl="docs" +) \ No newline at end of file diff --git a/docs/src/client.md b/docs/src/client.md new file mode 100644 index 0000000..8910585 --- /dev/null +++ b/docs/src/client.md @@ -0,0 +1,16 @@ +# Redis Client Connection + +```@docs +Client +execute(command::AbstractArray, client::Client) +Jedis.GLOBAL_CLIENT +set_global_client +get_global_client +Jedis.copy +disconnect! +reconnect! +wait_until_subscribed +wait_until_unsubscribed +wait_until_channel_unsubscribed +wait_until_pattern_unsubscribed +``` \ No newline at end of file diff --git a/docs/src/commands.md b/docs/src/commands.md new file mode 100644 index 0000000..3253764 --- /dev/null +++ b/docs/src/commands.md @@ -0,0 +1,40 @@ +# Redis Commands + +Jedis commands all share a common interface, if the `client` kwarg is not provided then the [`Jedis.GLOBAL_CLIENT`](@ref) instance will be used: + +```@example +command(args...; kwargs..., client=get_global_client()) +``` + +### Full list of Jedis commands: + +```@docs +auth +select +ping +flushdb +flushall +quit +set +Jedis.get +del +setex +expire +ttl +multi +exec +multi_exec +hset +hget +hgetall +hmget +hdel +lpush +rpush +lpop +rpop +blpop +brpop +llen +lrange +``` \ No newline at end of file diff --git a/docs/src/index.md b/docs/src/index.md new file mode 100644 index 0000000..35d5d9a --- /dev/null +++ b/docs/src/index.md @@ -0,0 +1,11 @@ +# Jedis.jl +A lightweight Redis client, implemented in Julia. + +## Key Features +This client supports: +- Basic **[command execution](https://captchanjack.github.io/Jedis.jl/commands/)** +- Executing commands with a **[global client](https://captchanjack.github.io/Jedis.jl/client/)** instance +- Executing commands atomically per client instance, with the help of socket locks +- **[Pipelining](https://captchanjack.github.io/Jedis.jl/pipeline/)** +- **[Transactions](https://captchanjack.github.io/Jedis.jl/commands/#Jedis.multi)** +- **[Pub/Sub](https://captchanjack.github.io/Jedis.jl/pubsub/)** \ No newline at end of file diff --git a/docs/src/pipeline.md b/docs/src/pipeline.md new file mode 100644 index 0000000..0c98863 --- /dev/null +++ b/docs/src/pipeline.md @@ -0,0 +1,9 @@ +# Pipelining + +```@docs +Pipeline +add! +execute(command::AbstractArray, pipe::Pipeline) +execute(pipe::Pipeline; filter_multi_exec=true) +pipeline +``` \ No newline at end of file diff --git a/docs/src/pubsub.md b/docs/src/pubsub.md new file mode 100644 index 0000000..0403aef --- /dev/null +++ b/docs/src/pubsub.md @@ -0,0 +1,9 @@ +# Pub/Sub + +```@docs +publish +subscribe +unsubscribe +psubscribe +punsubscribe +``` \ No newline at end of file diff --git a/src/Jedis.jl b/src/Jedis.jl new file mode 100644 index 0000000..9d134fe --- /dev/null +++ b/src/Jedis.jl @@ -0,0 +1,21 @@ +module Jedis + +export Client, Pipeline, RedisError, get_global_client, set_global_client, disconnect!, reconnect!, + add!, copy, wait_until_subscribed, wait_until_unsubscribed, wait_until_channel_unsubscribed, + wait_until_pattern_unsubscribed, execute, auth, select, ping, flushdb, flushall, quit, + set, get, del, setex, expire, ttl, multi, exec, multi_exec, pipeline, hset, hget, hgetall, + hmget, hdel, rpush, lpush, lpop, rpop, blpop, brpop, llen, lrange, publish, subscribe, + unsubscribe, psubscribe, punsubscribe + +using Sockets + +include("exceptions.jl") +include("utilities.jl") +include("client.jl") +include("pipeline.jl") +include("protocol.jl") +include("execute.jl") +include("commands.jl") +include("pubsub.jl") + +end # module \ No newline at end of file diff --git a/src/client.jl b/src/client.jl new file mode 100644 index 0000000..f0a7366 --- /dev/null +++ b/src/client.jl @@ -0,0 +1,208 @@ +""" + Client([; host="127.0.0.1", port=6379, database=0, password="", username=""]) -> Client + +Creates a Client instance connecting and authenticating with a Redis host. + +# Attributes +- `host::AbstractString`: Redis host. +- `port::Integer`: Redis port. +- `database::Integer`: Redis database index. +- `password::AbstractString`: Redis password if any. +- `username::AbstractString`: Redis username if any. +- `socket::TCPSocket`: Socket used for sending and reveiving from Redis host. +- `is_subscribed::Bool`: Whether this Client is actively subscribed to any channels or patterns. +- `subscriptions::AbstractSet{<:AbstractString}`: Set of channels currently subscribed on. +- `psubscriptions::AbstractSet{<:AbstractString}`: Set of patterns currently psubscribed on. + +# Note +- Connection parameters `host`, `port`, `database`, `password`, `username` will not change after +client istance is constructed, even with `SELECT` or `CONFIG SET` commands. + +# Examples +```julia-repl +julia> client = Client(); + +julia> set("key", "value"; client=client) +"OK" + +julia> get("key"; client=client) +"value" + +julia> execute(["DEL", "key"], client) +1 +``` +""" +mutable struct Client + host::AbstractString + port::Integer + database::Integer + password::AbstractString + username::AbstractString + socket::TCPSocket + is_subscribed::Bool + subscriptions::AbstractSet{<:AbstractString} + psubscriptions::AbstractSet{<:AbstractString} +end + +function Client(; host="127.0.0.1", port=6379, database=0, password="", username="") + client = Client(host, port, database, password, username, connect(host, port), false, Set{String}(), Set{String}()) + !isempty(password * username) && auth(password, username; client=client) + database != 0 && select(database; client=client) + return client +end + +""" + GLOBAL_CLIENT = Ref{Client}() + +Reference to a global Client object. +""" +const GLOBAL_CLIENT = Ref{Client}() + +""" + set_global_client(client::Client) + set_global_client([; host="127.0.0.1", port=6379, database=0, password="", username=""]) + +Sets a Client object as the `GLOBAL_CLIENT[]` instance. +""" +function set_global_client(client::Client) + GLOBAL_CLIENT[] = client +end + +function set_global_client(; host="127.0.0.1", port=6379, database=0, password="", username="") + client = Client(; host=host, port=port, database=database, password=password, username=username) + set_global_client(client) +end + +""" + get_global_client() -> Client + +Retrieves the `GLOBAL_CLIENT[]` instance, if unassigned then initialises it with default values +`host="127.0.0.1"`, `port=6379`, `database=0`, `password=""`, `username=""`. +""" +function get_global_client() + if isassigned(GLOBAL_CLIENT) + return GLOBAL_CLIENT[] + else + return set_global_client() + end +end + +""" + copy(client::Client) -> Client + +Creates a new Client instance, copying the connection parameters of the input. +""" +function copy(client::Client) + return Client(; host=client.host, port=client.port, database=client.database, password=client.password, username=client.username) +end + +""" + disconnect!(client::Client) + +Closes the client socket connection, it will be rendered unusable. +""" +function disconnect!(client::Client) + close(client.socket) +end + +""" + reconnect!(client::Client) -> Client + +Reconnects the input client socket connection. +""" +function reconnect!(client::Client) + disconnect!(client) + client.socket = connect(client.host, client.port) + !isempty(client.password * client.username) && auth(client.password, client.username; client=client) + client.database != 0 && select(client.database; client=client) + return client +end + +""" + flush!(client::Client) + +Reads and discards any bytes that remain unread in the client socket. +""" +function flush!(client::Client) + while bytesavailable(client.socket) > 0 + recv(client.socket) + end +end + +""" + set_subscribed!(client::Client) + +Marks the Client instance as subscribed, should not be used publicly. +""" +function set_subscribed!(client::Client) + client.is_subscribed = true +end + +""" + set_unsubscribed!(client::Client) + +Marks the Client instance as unsubscribed, should not be used publicly. +""" +function set_unsubscribed!(client::Client) + client.is_subscribed = false +end + +""" + wait_until_subscribed(client::Client) + +Blocks until client changes to a subscribed state. +""" +function wait_until_subscribed(client::Client) + if !client.is_subscribed + while !client.is_subscribed + sleep(0.001) + end + end +end + +""" + wait_until_unsubscribed(client::Client) + +Blocks until client changes to a unsubscribed state. +""" +function wait_until_unsubscribed(client::Client) + if client.is_subscribed + while client.is_subscribed + sleep(0.001) + end + end +end + +""" + wait_until_channel_unsubscribed(client::Client[, channels...]) + +Blocks until client is unsubscribed from channel(s), leave empty to wait until unsubscribed from all channels. +""" +function wait_until_channel_unsubscribed(client::Client, channels...) + if isempty(channels) + while !isempty(client.subscriptions) + sleep(0.001) + end + else + while !isempty(intersect(client.subscriptions, Set{String}(channels))) + sleep(0.001) + end + end +end + +""" + wait_until_pattern_unsubscribed(client::Client[, patterns...]) + +Blocks until client is unsubscribed from pattern(s), leave empty to wait until unsubscribed from all patterns. +""" +function wait_until_pattern_unsubscribed(client::Client, patterns...) + if isempty(patterns) + while !isempty(client.psubscriptions) + sleep(0.001) + end + else + while !isempty(intersect(client.psubscriptions, Set{String}(patterns))) + sleep(0.001) + end + end +end \ No newline at end of file diff --git a/src/commands.jl b/src/commands.jl new file mode 100644 index 0000000..337cef3 --- /dev/null +++ b/src/commands.jl @@ -0,0 +1,242 @@ +""" + auth(password[, username]) + +Authenticate to the server. +""" +auth(password, username=""; client=get_global_client()) = execute(["AUTH", username, password], client) + +""" + select(database) + +Change the selected database for the current connection. +""" +select(database; client=get_global_client()) = execute(["SELECT", database], client) + +""" + ping() + +Ping the server. +""" +ping(; client=get_global_client()) = execute(["PING"], client) + +""" + flushdb([; async=false]) + +Remove all keys from the current database. +""" +flushdb(; async=false, client=get_global_client()) = execute(["FLUSHDB", async ? "ASYNC" : ""], client) + +""" + flushall([; async=false]) + +Remove all keys from all databases. +""" +flushall(; async=false, client=get_global_client()) = execute(["FLUSHALL", async ? "ASYNC" : ""], client) + +""" + quit() + +Close the connection. +""" +quit(; client=get_global_client()) = execute(["QUIT"], client) + +""" + set(key, value) + +Set the string value of a key. +""" +set(key, value; client=get_global_client()) = execute(["SET", key, value], client) + +""" + get(key) + +Get the value of a key. +""" +Base.get(key; client=get_global_client()) = execute(["GET", key], client) + +""" + del(key[, keys...]) + +Delete a key. +""" +del(key, keys...; client=get_global_client()) = execute(["DEL", key, keys...], client) + +""" + setex(key, seconds, value) + +Set the value and expiration of a key. +""" +setex(key, seconds, value; client=get_global_client()) = execute(["SETEX", key, seconds, value], client) + +""" + expire(key, seconds) + +Set a key's tiem to live in seconds. +""" +expire(key, seconds; client=get_global_client()) = execute(["EXPIRE", key, seconds], client) + +""" + ttl(key) + +Get the time to live for a key. +""" +ttl(key; client=get_global_client()) = execute(["TTL", key], client) + +""" + multi() + +Mark the start of a transaction block. + +# Examples +```julia-repl +julia> multi() +"OK" + +julia> set("key", "value") +"QUEUED" + +julia> get("key") +"QUEUED" + +julia> exec() +2-element Array{String,1}: + "OK" + "value" +``` +""" +multi(; client=get_global_client()) = execute(["MULTI"], client) + +""" + exec() + +Execute all commands issued after MULTI. + +# Examples +```julia-repl +julia> multi() +"OK" + +julia> set("key", "value") +"QUEUED" + +julia> get("key") +"QUEUED" + +julia> exec() +2-element Array{String,1}: + "OK" + "value" +``` +""" +exec(; client=get_global_client()) = execute(["EXEC"], client) + +""" + multi_exec(fn::Function) + +Execute a MULTI/EXEC transction in a do block. + +# Examples +```julia-repl +julia> multi_exec() do + set("key", "value") + get("key") + get("key") + end +3-element Array{String,1}: + "OK" + "value" + "value" +``` +""" +multi_exec(fn::Function; client=get_global_client()) = (multi(; client=client); fn(); exec(; client=client)) + +""" + hset(key, field, value) + +Set the string value of a hash field. +""" +hset(key, field, value, fields_and_values...; client=get_global_client()) = execute(["HSET", key, field, value, fields_and_values...], client) + +""" + hget(key, field) + +Get the value of a hash field. +""" +hget(key, field; client=get_global_client()) = execute(["HGET", key, field], client) + +""" + hgetall(key) + +Get all the fields and values in a hash. +""" +hgetall(key; client=get_global_client()) = execute(["HGETALL", key], client) + +""" + hmget(key, field[, fields...]) + +Get the values of all the given hash fields. +""" +hmget(key, field, fields...; client=get_global_client()) = execute(["HMGET", key, field, fields...], client) + +""" + hdel(key, field[, fields...]) + +Delete one or more hash fields. +""" +hdel(key, field, fields...; client=get_global_client()) = execute(["HDEL", key, field, fields...], client) + +""" + lpush(key, element[, elements...]) + +Prepend one or multiple elements to a list. +""" +lpush(key, element, elements...; client=get_global_client()) = execute(["LPUSH", key, element, elements...], client) + +""" + rpush(key, element[, elements...]) + +Append one or multiple elements to a list. +""" +rpush(key, element, elements...; client=get_global_client()) = execute(["RPUSH", key, element, elements...], client) + +""" + lpop(key) + +Remove and get the first element in a list. +""" +lpop(key; client=get_global_client()) = execute(["LPOP", key], client) + +""" + rpop(key) + +Remove and get the last element in a list. +""" +rpop(key; client=get_global_client()) = execute(["RPOP", key], client) + +""" + blpop(key[, key...; timeout=0]) + +Remove and get the first element in a list, or block until one is available. +""" +blpop(key, keys...; timeout=0, client=get_global_client()) = execute(["BLPOP", key, keys..., timeout], client) + +""" + brpop(key[, key...; timeout=0]) + +Remove and get the last element in a list, or block until one is available. +""" +brpop(key, keys...; timeout=0, client=get_global_client()) = execute(["BRPOP", key, keys..., timeout], client) + +""" + llen(key) + +Get the length of a list. +""" +llen(key; client=get_global_client()) = execute(["LLEN", key], client) + +""" + lrange(key, start, stop) + +Get a range of elements from a list. +""" +lrange(key, start, stop; client=get_global_client()) = execute(["LRANGE", key, start, stop], client) \ No newline at end of file diff --git a/src/exceptions.jl b/src/exceptions.jl new file mode 100644 index 0000000..2ee3454 --- /dev/null +++ b/src/exceptions.jl @@ -0,0 +1,8 @@ +struct RedisError <: Exception + err_type::AbstractString + err_msg::AbstractString +end + +function Base.showerror(io::IO, ex::RedisError; backtrace=true) + printstyled(io, "$(typeof(ex)): $(ex.err_type)\n\n" * ex.err_msg * "\n", color=Base.error_color()) +end \ No newline at end of file diff --git a/src/execute.jl b/src/execute.jl new file mode 100644 index 0000000..23ccf87 --- /dev/null +++ b/src/execute.jl @@ -0,0 +1,128 @@ +""" + execute(command[; client::Client=get_global_client()]) + +Sends a RESP compliant command to the Redis host and returns the result. The command is either an +array of command keywords, or a single command string. Defaults to using the globally set Client. + +# Examples +```julia-repl +julia> execute(["SET", "key", "value"]) +"OK" +julia> execute("GET key") +"value" +``` +""" +function execute(command::AbstractArray, client::Client=get_global_client()) + if client.is_subscribed + throw(RedisError("SUBERROR", "Cannot execute commands while a subscription is open in the same Client instance")) + end + + @lock client.socket.lock begin + flush!(client) + write(client.socket, resp(command)) + msg = recv(client.socket) + + if msg isa Exception + throw(msg) + end + + return msg + end +end +function execute(command::AbstractString, client::Client=get_global_client()) + execute(split_on_whitespace(command), client) +end + +""" + execute_without_recv(command[; client::Client=get_global_client()]) + +Sends a RESP compliant command to the Redis host without reading the returned result. +""" +function execute_without_recv(command::AbstractArray, client::Client=get_global_client()) + @lock client.socket.lock begin + flush!(client) + write(client.socket, resp(command)) + return + end +end +function execute_without_recv(command::AbstractString, client::Client=get_global_client()) + execute_without_recv(split_on_whitespace(command), client) +end + +""" + execute(command, pipe::Pipeline) + +Add a RESP compliant command to a pipeline client. + +# Examples +```julia-repl +julia> pipe = Pipeline(); + +julia> execute(["SET", "key", "value"]; client=pipe); + +julia> execute(["GET", "key"]; client=pipe); + +julia> execute(pipe) +2-element Array{String,1}: + "OK" + "value" +``` +""" +function execute(command::AbstractArray, pipe::Pipeline) + add!(pipe, command) + return +end +function execute(command::AbstractString, pipe::Pipeline) + execute(split_on_whitespace(command), pipe) +end + +""" + execute(pipe::Pipeline[; filter_multi_exec=true]) + +Execute commands batched in a pipeline client, optionally filter out MULTI transaction responses +before the EXEC call, e.g. "QUEUED". + +# Examples +```julia-repl +julia> pipe = Pipeline(); + +julia> set("key", "value"; client=pipe); + +julia> get("key"; client=pipe); + +julia> multi(; client=pipe); + +julia> get("key"; client=pipe); + +julia> get("key"; client=pipe); + +julia> exec(; client=pipe); + +julia> execute(pipe) +2-element Array{String,1}: + "OK" + "value" + ["value", "value"] # Only the response from final exec() call is returned +``` +""" +function execute(pipe::Pipeline; filter_multi_exec=true) + if pipe.client.is_subscribed + throw(RedisError("SUBERROR", "Cannot execute Pipeline while a subscription is open in the same Client instance")) + end + + @lock pipe.client.socket.lock begin + try + flush!(pipe.client) + write(pipe.client.socket, pipe.resp) + messages = [recv(pipe.client.socket) for _ in 1:pipe.n_commands] + + if filter_multi_exec + return messages[pipe.multi_exec_bitmask] + end + + return messages + finally + flush!(pipe) + end + end +end \ No newline at end of file diff --git a/src/pipeline.jl b/src/pipeline.jl new file mode 100644 index 0000000..b4e1e38 --- /dev/null +++ b/src/pipeline.jl @@ -0,0 +1,102 @@ +""" +Pipeline([client::Client=get_global_client()]) -> Pipeline + +Creates a Pipeline client instance for executing commands in batch. + +# Attributes +- `client::Client`: Reference to the underlying Client connection. +- `resp::AbstractString`: Batched commands converted to RESP compliant string. +- `n_commands::Integer`: Number of commands currenrtly in the pipeline. +- `multi_exec::Bool`: Used to track and filter MULTI/EXEC transactions. +- `multi_exec_bitmask::AbstractArray{Bool}`: Used to track and filter MULTI/EXEC transactions. + +# Examples +```julia-repl +julia> pipe = Pipeline(); + +julia> set("key", "value"; client=pipe); + +julia> get("key"; client=pipe); + +julia> execute(pipe) +2-element Array{String,1}: + "OK" + "value" +``` +""" +mutable struct Pipeline + client::Client + resp::AbstractString + n_commands::Integer + multi_exec::Bool + multi_exec_bitmask::AbstractArray{Bool} + + Pipeline(client::Client=get_global_client()) = new(client, "", 0, true, []) +end + +""" + add!(pipe::Pipeline, command) + +Add a RESP compliant command to a pipeline client. +""" +function add!(pipe::Pipeline, command::AbstractArray) + pipe.resp *= resp(command) + pipe.n_commands += 1 + first = uppercase(command[1]) + + if first == "MULTI" + pipe.multi_exec = false + elseif first == "EXEC" + pipe.multi_exec = true + end + + push!(pipe.multi_exec_bitmask, pipe.multi_exec) +end +function add!(pipe::Pipeline, command::AbstractString) + add!(pipe, split_on_whitespace(command)) +end + +""" + add!(pipe::Pipeline, command) + +Flushes the underlying client socket and resets the pipeline in to a clean slate. +""" +function flush!(pipe::Pipeline) + flush!(pipe.client) + pipe.resp = "" + pipe.n_commands = 0 + pipe.multi_exec = false + pipe.multi_exec_bitmask = [] +end + +""" + pipeline(fn::Function[; clientt=get_global_client(), filter_multi_exec=true]) + +Execute commands batched in a pipeline client in a do block, optionally filter out MULTI transaction +responses before the EXEC call, e.g. "QUEUED". + +# Examples +```julia-repl +julia> pipeline() do pipe + lpush("example", 1, 2, 3, 4; client=pipe) + lpop("example"; client=pipe) + rpop("example"; client=pipe) + multi_exec(; client=pipe) do + lpop("example"; client=pipe) + rpop("example"; client=pipe) + end + lpop("example"; client=pipe) + end +5-element Array{Any,1}: + 4 # Integer response from lpush + "4" # String response from lpop + "1" # String response from rpop + ["3", "2"] # Array of String response from multi_exec do block, with responeses before the exec call filtered out + nothing # Nil response from final lpop +``` +""" +function Base.pipeline(fn::Function; client::Client=get_global_client(), filter_multi_exec=true) + pipe = Pipeline(client) + fn(pipe) + return execute(pipe; filter_multi_exec=filter_multi_exec) +end \ No newline at end of file diff --git a/src/protocol.jl b/src/protocol.jl new file mode 100644 index 0000000..4058e66 --- /dev/null +++ b/src/protocol.jl @@ -0,0 +1,86 @@ +const CRLF = "\r\n" + +const RedisType = ( + simple_string = '+', + error = '-', + integer = ':', + bulk_string = '$', + array = '*' +) + +""" + resp(command::AbstractArray) -> String + +Converts an array of redis command keywords to a RESP compliant String. +""" +function resp(command::AbstractArray) + r = "" + n = 0 + + for cmd in command + if isempty(cmd) + continue + end + + if cmd isa AbstractString + cmd = strip(cmd) + else + cmd = string(cmd) + end + + r *= "$(RedisType.bulk_string)$(length(cmd))$(CRLF)$(cmd)$(CRLF)" + n += 1 + end + + if isempty(r) + throw(RedisError("ERR", "Non-compliant command $command")) + end + + return "$(RedisType.array)$(n)$(CRLF)" * r +end + +function handle_simple_string(_, x) + return x +end + +function handle_integer(_, x) + return parse(Int64, x) +end + +function handle_error(_, x) + err_type, err_msg = split(x, ' '; limit=2) + return RedisError(err_type, err_msg) +end + +function handle_bulk_string(s, x) + if x == "-1" + return nothing + end + return readline(s) +end + +function handle_array(s, x) + if x == "0" + return [] + end + return [recv(s) for _ in 1:parse(Int64, x)] +end + +const RESPHandler = Dict{Char,Function}( + '+' => handle_simple_string, + '-' => handle_error, + ':' => handle_integer, + '$' => handle_bulk_string, + '*' => handle_array +) + +""" + recv(s::TCPSocket) + +Reads any bytes before the next CRLF (\r\n) in a TCPScoket, blocks if no bytes available. +""" +function recv(s::TCPSocket) + line = readline(s) + handler = RESPHandler[line[1]] + return handler(s, line[2:end]) +end \ No newline at end of file diff --git a/src/pubsub.jl b/src/pubsub.jl new file mode 100644 index 0000000..85f188c --- /dev/null +++ b/src/pubsub.jl @@ -0,0 +1,227 @@ +""" + publish(channel, message) + +Post a message to a channel. +""" +publish(channel, message; client=get_global_client()) = execute(["PUBLISH", channel, message], client) + +""" + subscribe(fn::Function, + channel, + channels...; + stop_fn::Function=(msg) -> false, + err_cb::Function=(err) -> log_error(err)) + +Listen for messages published to the given channels in a do block. Optionally provide a stop +function `stop_fn(msg)` which gets run as a callback everytime a subscription message is received, +the subscription loop breaks if the `stop_fn` returns `true`. Optionally provide `err_cb(err)` +function which gets run on encountering an exception in the main subscription loop. + +# Examples +```julia-repl +julia> channels = ["first", "second"]; + +julia> publisher = Client(); + +julia> subscriber = Client(); + +julia> stop_fn(msg) = msg[end] == "close subscription"; # stop the subscription loop if the message matches + +julia> messages = []; + +julia> @async subscribe(channels...; stop_fn=stop_fn, client=subscriber) do msg + push!(messages, msg) + end; # Without @async this function will block, alternatively use Thread.@spawn + +julia> wait_until_subscribed(subscriber); + +julia> subscriber.is_subscribed +true + +julia> subscriber.subscriptions +Set{String} with 2 elements: + "second" + "first" + +julia> publish("first", "hello"; client=publisher); + +julia> publish("second", "world"; client=publisher); + +julia> println(messages) +Any[["message", "first", "hello"], ["message", "second", "world"]] # message has the format [, , ] + +julia> unsubscribe("first"; client=subscriber); + +julia> wait_until_channel_unsubscribed(subscriber, "first"); + +julia> subscriber.subscriptions +Set{String} with 1 element: + "second" + +julia> unsubscribe(; client=subscriber); # unsubscribe from all channels + +julia> wait_until_unsubscribed(subscriber); + +julia> subscriber.is_subscribed +false + +julia> subscriber.subscriptions +Set{String}() +``` +""" +function subscribe(fn::Function, channel, channels...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> log_error(err), client::Client=get_global_client()) + if client.is_subscribed + throw(RedisError("SUBERROR", "Cannot open multiple subscriptions in the same Client instance")) + end + + @lock client.socket.lock client.subscriptions = Set([channel, channels...]) + execute(["SUBSCRIBE", client.subscriptions...], client) + @lock client.socket.lock set_subscribed!(client) + yield() + try + while true + type, chnl = msg = recv(client.socket) + + if type == "message" && chnl in client.subscriptions + fn(msg) + stop_fn(msg) && break + + elseif type == "unsubscribe" + if isnothing(chnl) + @lock client.socket.lock client.subscriptions = Set{String}() + elseif chnl in client.subscriptions + @lock client.socket.lock delete!(client.subscriptions, chnl) + end + + isempty(client.subscriptions) && break + end + end + catch err + err_cb(err) + finally + if !isempty(client.subscriptions) + unsubscribe(client.subscriptions...; client=client) + @lock client.socket.lock client.subscriptions = Set{String}() + @lock client.socket.lock flush!(client) + end + @lock client.socket.lock set_unsubscribed!(client) + end +end + +""" + unsubscribe([channels...]) -> nothing + +Unsubscribes the client from the given channels, or from all of them if none is given. +""" +unsubscribe(channels...; client=get_global_client()) = execute_without_recv(["UNSUBSCRIBE", channels...], client) + +""" + psubscribe(fn::Function, + pattern, + patterns...; + stop_fn::Function=(msg) -> false, + err_cb::Function=(err) -> log_error(err)) + +Listen for messages published to the given channels matching ghe given patterns in a do block. +Optionally provide a stop function `stop_fn(msg)` which gets run as a callback everytime a +subscription message is received, the subscription loop breaks if the `stop_fn` returns `true`. +Optionally provide `err_cb(err)` function which gets run on encountering an exception in the main +subscription loop. + +# Examples +```julia-repl +julia> patterns = ["first*", "second*"]; + +julia> publisher = Client(); + +julia> subscriber = Client(); + +julia> stop_fn(msg) = msg[end] == "close subscription"; # stop the subscription loop if the message matches + +julia> messages = []; + +julia> @async psubscribe(patterns...; stop_fn=stop_fn, client=subscriber) do msg + push!(messages, msg) + end; # Without @async this function will block, alternatively use Thread.@spawn + +julia> wait_until_subscribed(subscriber); + +julia> subscriber.is_subscribed +true + +julia> subscriber.psubscriptions +Set{String} with 2 elements: + "first*" + "second*" + +julia> publish("first_pattern", "hello"; client=publisher); + +julia> publish("second_pattern", "world"; client=publisher); + +julia> println(messages) +Any[["pmessage", "first*", "first_pattern", "hello"], ["pmessage", "second*", "second_pattern", "world"]] # message has the format [, , , ] + +julia> punsubscribe("first*"; client=subscriber); + +julia> wait_until_pattern_unsubscribed(subscriber, "first*"); + +julia> subscriber.psubscriptions +Set{String} with 1 element: + "second*" + +julia> punsubscribe(; client=subscriber); # unsubscribe from all patterns + +julia> wait_until_unsubscribed(subscriber); + +julia> subscriber.is_subscribed +false + +julia> subscriber.psubscriptions +Set{String}() +``` +""" +function psubscribe(fn::Function, pattern, patterns...; stop_fn::Function=(msg) -> false, err_cb::Function=(err) -> log_error(err), client::Client=get_global_client()) + if client.is_subscribed + throw(RedisError("SUBERROR", "Cannot open multiple subscriptions in the same Client instance")) + end + + @lock client.socket.lock client.psubscriptions = Set([pattern, patterns...]) + execute(["PSUBSCRIBE", client.psubscriptions...], client) + @lock client.socket.lock set_subscribed!(client) + + try + while true + type, pttrn = msg = recv(client.socket) + + if type == "pmessage" && pttrn in client.psubscriptions + fn(msg) + stop_fn(msg) && break + + elseif type == "punsubscribe" + if isnothing(pttrn) + @lock client.socket.lock client.psubscriptions = Set{String}() + elseif pttrn in client.psubscriptions + @lock client.socket.lock delete!(client.psubscriptions, pttrn) + end + + isempty(client.psubscriptions) && break + end + end + catch err + err_cb(err) + finally + if !isempty(client.psubscriptions) + punsubscribe(client.psubscriptions...; client=client) + @lock client.socket.lock client.psubscriptions = Set{String}() + @lock client.socket.lock flush!(client) + end + @lock client.socket.lock set_unsubscribed!(client) + end +end + +""" + unsubscribe([channels...]) -> nothing + +Unsubscribes the client from the given patterns, or from all of them if none is given. +""" +punsubscribe(patterns...; client=get_global_client()) = execute_without_recv(["PUNSUBSCRIBE", patterns...], client) \ No newline at end of file diff --git a/src/utilities.jl b/src/utilities.jl new file mode 100644 index 0000000..1878c1b --- /dev/null +++ b/src/utilities.jl @@ -0,0 +1,27 @@ +""" + split_on_whitespace(x) + +Split a string on any amount of whitespaces. +""" +split_on_whitespace(x) = split(strip(x), r"(\s+(?=\S))") + +""" + log_error(err::Exception) + +Log an exception and its backtrace to stderr. +""" +log_error(err::Exception) = @error "Function Failed: $(typeof(err))" exception=(err, catch_backtrace()) + +""" + @lock expr + +Locks a function. +""" +macro lock(lock, expr) + lk = esc(lock) + quote + lock($lk) do + $(esc(expr)) + end + end +end \ No newline at end of file diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 0000000..0c36332 --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,2 @@ +[deps] +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/test/runtests.jl b/test/runtests.jl new file mode 100644 index 0000000..5d77667 --- /dev/null +++ b/test/runtests.jl @@ -0,0 +1,6 @@ +using Jedis +using Test + +@testset "Commands" begin include("test_commands.jl") end +@testset "Pub/Sub" begin include("test_pubsub.jl") end +@testset "Pipeline" begin include("test_pipeline.jl") end \ No newline at end of file diff --git a/test/test_commands.jl b/test/test_commands.jl new file mode 100644 index 0000000..54f360f --- /dev/null +++ b/test/test_commands.jl @@ -0,0 +1,79 @@ +set_global_client() + +@testset "AUTH SELECT PING" begin + @test_throws RedisError auth("") + @test select(0) == "OK" + @test ping() == "PONG" +end + +@testset "GET SET" begin + @test set("key", "value") == "OK" + @test get("key") == "value" + @test flushdb() == "OK" && isnothing(get("key")) +end + +@testset "EXPIRE" begin + @test setex("key", 10, "value") == "OK" + @test ttl("key") > 0 + @test del("key") == 1 && isnothing(get("key")) + @test set("key", "value") == "OK" + @test ttl("key") == -1 + @test expire("key", 10) == 1 + @test ttl("key") > 0 + @test flushdb() == "OK" && isnothing(get("key")) +end + +@testset "MULTI EXEC" begin + @test multi() == "OK" + @test set("key", "value") == "QUEUED" + @test get("key") == "QUEUED" + @test get("key") == "QUEUED" + @test exec() == ["OK", "value", "value"] + @test ["OK", "value", "value"] == multi_exec() do + set("key", "value") + get("key") + get("key") + end + @test flushall() == "OK" && isnothing(get("key")) +end + +@testset "HASH" begin + @test hset("key", "f1", 1, "f2", 2, "f3", 3) == 3 + @test hget("key", "f2") == "2" + @test hgetall("key") == ["f1", "1", "f2", "2", "f3", "3"] + @test hmget("key", "f1", "f2", "doesnotexist") == ["1", "2", nothing] + @test hdel("key", "f1") == 1 + @test isnothing(hget("key", "f1")) + @test flushall() == "OK" && isnothing(get("key")) +end + +@testset "LIST" begin + @test lpush("mylist", 3, 2, 1) == 3 + @test rpush("mylist", 4, 5, 6) == 6 + @test llen("mylist") == 6 + @test lrange("mylist", 0, -1) == ["1", "2", "3", "4", "5", "6"] + @test lpop("mylist") == "1" + @test rpop("mylist") == "6" + @test lrange("mylist", 0, -1) == ["2", "3", "4", "5"] + flushall() + @test begin + task = @async blpop("mylist", "otherlist") == ["otherlist", "1"] + other = Client() + lpush("otherlist", 1; client=other) + disconnect!(other) + fetch(task) + end + @test begin + task = @async brpop("mylist", "otherlist") == ["mylist", "6"] + other = Client() + lpush("mylist", 6; client=other) + disconnect!(other) + fetch(task) + end + flushall() +end + +@testset "QUIT" begin + @test quit() == "OK" + @test_throws Base.IOError ping() +end \ No newline at end of file diff --git a/test/test_pipeline.jl b/test/test_pipeline.jl new file mode 100644 index 0000000..ca5f29d --- /dev/null +++ b/test/test_pipeline.jl @@ -0,0 +1,48 @@ +set_global_client() + +@testset "Pipeline - Basic" begin + pipe = Pipeline() + @test pipe.n_commands == 0 + for _ in 1:1000 + lrange("nothing", 0, -1; client=pipe) + end + @test pipe.n_commands == 1000 + result = execute(pipe) + @test result == fill([], 1000) + @test pipe.n_commands == 0 +end + +@testset "Pipeline - Do Block" begin + result = pipeline() do pipe + for _ in 1:1000 + lrange("nothing", 0, -1; client=pipe) + end + end + @test result == fill([], 1000) +end + +@testset "Pipeline - MULTI/EXEC" begin + no_filter_result = pipeline(; filter_multi_exec=false) do pipe + multi(; client=pipe) + for _ in 1:1000 + lrange("nothing", 0, -1; client=pipe) + end + exec(; client=pipe) + end + @test length(no_filter_result) == 1002 + @test no_filter_result[1] == "OK" + @test no_filter_result[2:length(no_filter_result)-1] == fill("QUEUED", 1000) + @test no_filter_result[end] == fill([], 1000) + + filter_result = pipeline() do pipe + multi_exec(; client=pipe) do + for _ in 1:1000 + lrange("nothing", 0, -1; client=pipe) + end + end + end + @test length(filter_result) == 1 + @test filter_result[1] == fill([], 1000) +end + +flushall() \ No newline at end of file diff --git a/test/test_pubsub.jl b/test/test_pubsub.jl new file mode 100644 index 0000000..6cacf95 --- /dev/null +++ b/test/test_pubsub.jl @@ -0,0 +1,117 @@ +set_global_client() + +@testset "SUBSCRIBE" begin + channels = ["first", "second", "third"] + publisher = Client() + subscriber = Client() + messages = [] + @test subscriber.is_subscribed == false + + @async subscribe(channels...; client=subscriber) do msg + push!(messages, msg) + end + + wait_until_subscribed(subscriber) + @test subscriber.is_subscribed == true + @test subscriber.subscriptions == Set{String}(channels) + + @test publish("first", "hello"; client=publisher) == 1 + @test publish("second", "world"; client=publisher) == 1 + @test publish("something", "else"; client=publisher) == 0 + + @test length(messages) == 2 + @test messages[1] == ["message", "first", "hello"] + @test messages[2] == ["message", "second", "world"] + + @test_throws RedisError set("already", "subscribed"; client=subscriber) + @test_throws RedisError subscribe("alreadsubscribed"; client=subscriber) do msg end + + unsubscribe("first"; client=subscriber) + + wait_until_channel_unsubscribed(subscriber, "first") + @test subscriber.is_subscribed == true + @test subscriber.subscriptions == Set{String}(["second", "third"]) + + @test publish("first", "not subscribed anymore"; client=publisher) == 0 + + @test length(messages) == 2 + + unsubscribe(; client=subscriber) # unsubscribe from everything + + wait_until_unsubscribed(subscriber) + @test subscriber.is_subscribed == false + @test isempty(subscriber.subscriptions) + + stop_fn(msg) = msg[end] == "close subscription" + + @async subscribe(channels...; stop_fn=stop_fn, client=subscriber) do msg end + + wait_until_subscribed(subscriber) + @test subscriber.is_subscribed == true + @test subscriber.subscriptions == Set{String}(channels) + + @test publish("first", "close subscription"; client=publisher) == 1 + + wait_until_unsubscribed(subscriber) + @test subscriber.is_subscribed == false + @test isempty(subscriber.subscriptions) +end + +@testset "PSUBSCRIBE" begin + patterns = ["first*", "second*", "third*"] + publisher = Client() + subscriber = Client() + messages = [] + @test subscriber.is_subscribed == false + + @async psubscribe(patterns...; client=subscriber) do msg + push!(messages, msg) + end + + wait_until_subscribed(subscriber) + @test subscriber.is_subscribed == true + @test subscriber.psubscriptions == Set{String}(patterns) + + @test publish("first_pattern", "hello"; client=publisher) == 1 + @test publish("second_pattern", "world"; client=publisher) == 1 + @test publish("something", "else"; client=publisher) == 0 + + @test length(messages) == 2 + @test messages[1] == ["pmessage", "first*", "first_pattern", "hello"] + @test messages[2] == ["pmessage", "second*", "second_pattern", "world"] + + @test_throws RedisError set("already", "subscribed"; client=subscriber) + @test_throws RedisError psubscribe("alreadsubscribed"; client=subscriber) do msg end + + punsubscribe("first*"; client=subscriber) + + wait_until_pattern_unsubscribed(subscriber, "first*") + @test subscriber.is_subscribed == true + @test subscriber.psubscriptions == Set{String}(["second*", "third*"]) + + @test publish("first_pattern", "not subscribed anymore"; client=publisher) == 0 + + @test length(messages) == 2 + + punsubscribe(; client=subscriber) # unsubscribe from everything + + wait_until_unsubscribed(subscriber) + @test subscriber.is_subscribed == false + @test isempty(subscriber.psubscriptions) + + stop_fn(msg) = msg[end] == "close subscription" + + @async psubscribe(patterns...; stop_fn=stop_fn, client=subscriber) do msg end + + wait_until_subscribed(subscriber) + @test subscriber.is_subscribed == true + @test subscriber.psubscriptions == Set{String}(patterns) + + @test publish("first_pattern", "close subscription"; client=publisher) == 1 + + wait_until_unsubscribed(subscriber) + @test subscriber.is_subscribed == false + @test isempty(subscriber.subscriptions) +end + +flushall() \ No newline at end of file