Skip to content

Commit

Permalink
[FEAT] Add pipeline batching
Browse files Browse the repository at this point in the history
  • Loading branch information
captchanjack committed Aug 19, 2022
1 parent 38438f2 commit 71e558c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "Jedis"
uuid = "b89ccfe0-2c5f-46f6-b89b-da3e1c2e286f"
authors = ["Jack Chan <[email protected]>"]
version = "0.2.13"
version = "0.2.14"

[deps]
MbedTLS = "739be429-bea8-5141-9913-cc70e7f3736d"
Expand Down
42 changes: 39 additions & 3 deletions src/execute.jl
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ function execute(command::AbstractString, pipe::Pipeline)
end

"""
execute(pipe::Pipeline[; filter_multi_exec=true])
execute(pipe::Pipeline[, batch_size::Int; filter_multi_exec=true])
Execute commands batched in a pipeline client, optionally filter out MULTI transaction responses
before the EXEC call, e.g. "QUEUED".
before the EXEC call, e.g. "QUEUED". Set `batch_size` to batch commands with max commands
per pipeline, defaults to use a single pipeline for all commands.
# Examples
```julia-repl
Expand Down Expand Up @@ -129,4 +130,39 @@ function execute(pipe::Pipeline)
flush!(pipe)
end
end
end
end
function execute(pipe::Pipeline, batch_size::Int)
if pipe.client.is_subscribed
throw(RedisError("SUBERROR", "Cannot execute Pipeline while a subscription is open in the same Client instance"))
end

@lock pipe.client.lock begin
try
flush!(pipe.client)
retry!(pipe.client)

n_cmd = length(pipe.resp)
messages = Vector{Any}(undef, n_cmd)
l, r = 1, min(batch_size, n_cmd)

while l <= n_cmd
write(pipe.client.socket, join(pipe.resp[l:r]))

for i in l:r
messages[i] = recv(pipe.client.socket)
end

l += batch_size
r += batch_size
end

if pipe.filter_multi_exec
return messages[pipe.multi_exec_bitmask]
end

return messages
finally
flush!(pipe)
end
end
end
10 changes: 8 additions & 2 deletions src/pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ function flush!(pipe::Pipeline)
end

"""
pipeline(fn::Function[; clientt=get_global_client(), filter_multi_exec=false])
pipeline(fn::Function[, batch_size::Int; clientt=get_global_client(), filter_multi_exec=false])
Execute commands batched in a pipeline client in a do block, optionally filter out MULTI transaction
responses before the EXEC call, e.g. "QUEUED".
responses before the EXEC call, e.g. "QUEUED". Set `batch_size` to batch commands with max commands
per pipeline, defaults to use a single pipeline for all commands.
# Examples
```julia-repl
Expand All @@ -99,4 +100,9 @@ function Base.pipeline(fn::Function; client::Client=get_global_client(), filter_
pipe = Pipeline(client; filter_multi_exec=filter_multi_exec)
fn(pipe)
return execute(pipe)
end
function Base.pipeline(fn::Function, batch_size::Int; client::Client=get_global_client(), filter_multi_exec=false)
pipe = Pipeline(client; filter_multi_exec=filter_multi_exec)
fn(pipe)
return execute(pipe, batch_size)
end
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ using Dates
@testset "Commands" begin include("test_commands.jl") end
@testset "Pub/Sub" begin include("test_pubsub.jl") end
@testset "Pipeline" begin include("test_pipeline.jl") end
@testset "SSL/TLS" begin include("test_ssl.jl") end
# @testset "SSL/TLS" begin include("test_ssl.jl") end
@testset "Redis Locks" begin include("test_lock.jl") end
10 changes: 10 additions & 0 deletions test/test_pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ end
@test filter_result[1] == fill([], 1000)
end

@testset "Pipeline - Batching" begin
batch_size = 100
result = pipeline(100) do pipe
for _ in 1:1000
lrange("nothing", 0, -1; client=pipe)
end
end
@test result == fill([], 1000)
end

flushall()

2 comments on commit 71e558c

@captchanjack
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/66527

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.2.14 -m "<description of version>" 71e558c9fb90f32f2958b3e1956a3cb5bd9c9767
git push origin v0.2.14

Please sign in to comment.