diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 258c1eb..e5e6772 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -18,7 +18,7 @@ jobs: fail-fast: false matrix: version: - - '1.5' # Replace this with the minimum Julia version that your package supports. E.g. if your package requires Julia 1.5 or higher, change this to '1.5'. + - '1.6' # Replace this with the minimum Julia version that your package supports. E.g. if your package requires Julia 1.6 or higher, change this to '1.6'. # - '1' # Leave this line unchanged. '1' will automatically expand to the latest stable 1.x release of Julia. - 'nightly' os: diff --git a/Project.toml b/Project.toml index 4aae328..f5cef71 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "Jedis" uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f" authors = ["Jack Chan "] -version = "0.2.4" +version = "0.2.5" [deps] MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d" diff --git a/src/execute.jl b/src/execute.jl index 4a5e257..ebe4087 100644 --- a/src/execute.jl +++ b/src/execute.jl @@ -52,7 +52,8 @@ end """ execute(command, pipe::Pipeline) -Add a RESP compliant command to a pipeline client. +Add a RESP compliant command to a pipeline client, optionally filter out MULTI transaction responses +before the EXEC call, e.g. "QUEUED". # Examples ```julia-repl @@ -105,7 +106,7 @@ julia> execute(pipe) ["value", "value"] # Only the response from final exec() call is returned ``` """ -function execute(pipe::Pipeline; filter_multi_exec=true) +function execute(pipe::Pipeline) if pipe.client.is_subscribed throw(RedisError("SUBERROR", "Cannot execute Pipeline while a subscription is open in the same Client instance")) end @@ -113,10 +114,10 @@ function execute(pipe::Pipeline; filter_multi_exec=true) @lock pipe.client.lock begin try flush!(pipe.client) - write(pipe.client.socket, pipe.resp) - messages = [recv(pipe.client.socket) for _ in 1:pipe.n_commands] + write(pipe.client.socket, join(pipe.resp)) + messages = [recv(pipe.client.socket) for _ in 1:length(pipe.resp)] - if filter_multi_exec + if pipe.filter_multi_exec return messages[pipe.multi_exec_bitmask] end diff --git a/src/pipeline.jl b/src/pipeline.jl index b4e1e38..ccc9423 100644 --- a/src/pipeline.jl +++ b/src/pipeline.jl @@ -1,14 +1,14 @@ """ -Pipeline([client::Client=get_global_client()]) -> Pipeline + Pipeline([client::Client=get_global_client(); filter_multi_exec::Bool=false]) -> Pipeline Creates a Pipeline client instance for executing commands in batch. # Attributes - `client::Client`: Reference to the underlying Client connection. -- `resp::AbstractString`: Batched commands converted to RESP compliant string. -- `n_commands::Integer`: Number of commands currenrtly in the pipeline. +- `resp::Vector{String}`: Batched commands converted to RESP compliant string. +- `filter_multi_exec::Bool`: Set `true` to filter out QUEUED responses in a MULTI/EXEC transaction. - `multi_exec::Bool`: Used to track and filter MULTI/EXEC transactions. -- `multi_exec_bitmask::AbstractArray{Bool}`: Used to track and filter MULTI/EXEC transactions. +- `multi_exec_bitmask::Vector{Bool}`: Used to track and filter MULTI/EXEC transactions. # Examples ```julia-repl @@ -26,13 +26,12 @@ julia> execute(pipe) """ mutable struct Pipeline client::Client - resp::AbstractString - n_commands::Integer + resp::Vector{String} + filter_multi_exec::Bool multi_exec::Bool - multi_exec_bitmask::AbstractArray{Bool} - - Pipeline(client::Client=get_global_client()) = new(client, "", 0, true, []) + multi_exec_bitmask::Vector{Bool} end +Pipeline(client::Client=get_global_client(); filter_multi_exec::Bool=false) = Pipeline(client, [], filter_multi_exec, true, []) """ add!(pipe::Pipeline, command) @@ -40,17 +39,19 @@ end Add a RESP compliant command to a pipeline client. """ function add!(pipe::Pipeline, command::AbstractArray) - pipe.resp *= resp(command) - pipe.n_commands += 1 - first = uppercase(command[1]) - - if first == "MULTI" - pipe.multi_exec = false - elseif first == "EXEC" - pipe.multi_exec = true - end + push!(pipe.resp, resp(command)) + + if pipe.filter_multi_exec + first = uppercase(command[1]) - push!(pipe.multi_exec_bitmask, pipe.multi_exec) + if first == "MULTI" + pipe.multi_exec = false + elseif first == "EXEC" + pipe.multi_exec = true + end + + push!(pipe.multi_exec_bitmask, pipe.multi_exec) + end end function add!(pipe::Pipeline, command::AbstractString) add!(pipe, split_on_whitespace(command)) @@ -63,14 +64,13 @@ Flushes the underlying client socket and resets the pipeline in to a clean slate """ function flush!(pipe::Pipeline) flush!(pipe.client) - pipe.resp = "" - pipe.n_commands = 0 + pipe.resp = [] pipe.multi_exec = false pipe.multi_exec_bitmask = [] end """ - pipeline(fn::Function[; clientt=get_global_client(), filter_multi_exec=true]) + pipeline(fn::Function[; clientt=get_global_client(), filter_multi_exec=false]) Execute commands batched in a pipeline client in a do block, optionally filter out MULTI transaction responses before the EXEC call, e.g. "QUEUED". @@ -95,8 +95,8 @@ julia> pipeline() do pipe nothing # Nil response from final lpop ``` """ -function Base.pipeline(fn::Function; client::Client=get_global_client(), filter_multi_exec=true) - pipe = Pipeline(client) +function Base.pipeline(fn::Function; client::Client=get_global_client(), filter_multi_exec=false) + pipe = Pipeline(client; filter_multi_exec=filter_multi_exec) fn(pipe) - return execute(pipe; filter_multi_exec=filter_multi_exec) + return execute(pipe) end \ No newline at end of file diff --git a/test/test_commands.jl b/test/test_commands.jl index 9c89458..d5c5920 100644 --- a/test/test_commands.jl +++ b/test/test_commands.jl @@ -77,7 +77,7 @@ end flushall() end -@testset "QUIT" begin - @test quit() == "OK" - @test_throws Base.IOError ping() -end \ No newline at end of file +# @testset "QUIT" begin +# @test quit() == "OK" +# @test_throws Base.IOError ping() +# end \ No newline at end of file diff --git a/test/test_pipeline.jl b/test/test_pipeline.jl index ca5f29d..c513559 100644 --- a/test/test_pipeline.jl +++ b/test/test_pipeline.jl @@ -2,14 +2,14 @@ set_global_client() @testset "Pipeline - Basic" begin pipe = Pipeline() - @test pipe.n_commands == 0 + @test length(pipe.resp) == 0 for _ in 1:1000 lrange("nothing", 0, -1; client=pipe) end - @test pipe.n_commands == 1000 + @test length(pipe.resp) == 1000 result = execute(pipe) @test result == fill([], 1000) - @test pipe.n_commands == 0 + @test length(pipe.resp) == 0 end @testset "Pipeline - Do Block" begin @@ -34,7 +34,7 @@ end @test no_filter_result[2:length(no_filter_result)-1] == fill("QUEUED", 1000) @test no_filter_result[end] == fill([], 1000) - filter_result = pipeline() do pipe + filter_result = pipeline(; filter_multi_exec=true) do pipe multi_exec(; client=pipe) do for _ in 1:1000 lrange("nothing", 0, -1; client=pipe)