diff --git a/CHANGELOG.md b/CHANGELOG.md index 09480f72..fd98910a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## v0.5.1-dev - Add `classical_delay` and `quantum_delay` as keyword arguments to the `RegisterNet` constructor to set a default global network edge latency. +- Implement `AsymmetricSemaphore`, a resource object that allows multiple processes to wait for an update. ## v0.5.0 - 2024-10-16 diff --git a/src/ProtocolZoo/ProtocolZoo.jl b/src/ProtocolZoo/ProtocolZoo.jl index dee968ac..fbd8fb10 100644 --- a/src/ProtocolZoo/ProtocolZoo.jl +++ b/src/ProtocolZoo/ProtocolZoo.jl @@ -1,7 +1,7 @@ module ProtocolZoo using QuantumSavory -import QuantumSavory: get_time_tracker, Tag, isolderthan +import QuantumSavory: get_time_tracker, Tag, isolderthan, onchange_tag using QuantumSavory: Wildcard using QuantumSavory.CircuitZoo: EntanglementSwap, LocalEntanglementSwap @@ -208,9 +208,13 @@ end b_ = findfreeslot(prot.net[prot.nodeB]; randomize=prot.randomize, margin=margin) if isnothing(a_) || isnothing(b_) - isnothing(prot.retry_lock_time) && error("We do not yet support waiting on register to make qubits available") # TODO - @debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n retrying..." - @yield timeout(prot.sim, prot.retry_lock_time) + if isnothing(prot.retry_lock_time) + @debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n waiting..." + @yield onchange_tag(prot.net[prot.nodeA]) | onchange_tag(prot.net[prot.nodeB]) + else + @debug "EntanglerProt between $(prot.nodeA) and $(prot.nodeB)|round $(round): Failed to find free slots. \nGot:\n1. \t $a_ \n2.\t $b_ \n retrying..." + @yield timeout(prot.sim, prot.retry_lock_time) + end continue end # we are now certain that a_ and b_ are not nothing. The compiler is not smart enough to figure this out @@ -397,20 +401,27 @@ function EntanglementConsumer(net::RegisterNet, nodeA::Int, nodeB::Int; kwargs.. end @resumable function (prot::EntanglementConsumer)() - if isnothing(prot.period) - error("In `EntanglementConsumer` we do not yet support waiting on register to make qubits available") # TODO - end while true query1 = query(prot.net[prot.nodeA], EntanglementCounterpart, prot.nodeB, ❓; locked=false, assigned=true) # TODO Need a `querydelete!` dispatch on `Register` rather than using `query` here followed by `untag!` below if isnothing(query1) @debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on first node found no entanglement" - @yield timeout(prot.sim, prot.period) + if isnothing(prot.period) + @debug "Waiting on changes in $(prot.nodeA)" + @yield onchange_tag(prot.net[prot.nodeA]) + else + @yield timeout(prot.sim, prot.period) + end continue else query2 = query(prot.net[prot.nodeB], EntanglementCounterpart, prot.nodeA, query1.slot.idx; locked=false, assigned=true) if isnothing(query2) # in case EntanglementUpdate hasn't reached the second node yet, but the first node has the EntanglementCounterpart @debug "EntanglementConsumer between $(prot.nodeA) and $(prot.nodeB): query on second node found no entanglement (yet...)" - @yield timeout(prot.sim, prot.period) + if isnothing(prot.period) + @debug "Waiting on changes in $(prot.nodeB)" + @yield onchange_tag(prot.net[prot.nodeB]) + else + @yield timeout(prot.sim, prot.period) + end continue end end @@ -431,7 +442,9 @@ end push!(prot.log, (now(prot.sim), ob1, ob2)) unlock(q1) unlock(q2) - @yield timeout(prot.sim, prot.period) + if !isnothing(prot.period) + @yield timeout(prot.sim, prot.period) + end end end diff --git a/src/ProtocolZoo/cutoff.jl b/src/ProtocolZoo/cutoff.jl index da17356c..4d399475 100644 --- a/src/ProtocolZoo/cutoff.jl +++ b/src/ProtocolZoo/cutoff.jl @@ -33,9 +33,6 @@ function CutoffProt(sim::Simulation, net::RegisterNet, node::Int; kwargs...) end @resumable function (prot::CutoffProt)() - if isnothing(prot.period) - error("In `CutoffProt` we do not yet support quing up and waiting on register") # TODO - end reg = prot.net[prot.node] while true for slot in reg # TODO these should be done in parallel, otherwise we will be waiting on each slot, greatly slowing down the cutoffs @@ -72,6 +69,8 @@ end unlock(slot) end - @yield timeout(prot.sim, prot.period) + if !isnothing(prot.period) + @yield timeout(prot.sim, prot.period) + end end end diff --git a/src/ProtocolZoo/swapping.jl b/src/ProtocolZoo/swapping.jl index 948556b8..bc79ae35 100644 --- a/src/ProtocolZoo/swapping.jl +++ b/src/ProtocolZoo/swapping.jl @@ -68,8 +68,12 @@ end while rounds != 0 qubit_pair_ = findswapablequbits(prot.net, prot.node, prot.nodeL, prot.nodeH, prot.chooseL, prot.chooseH; agelimit=prot.agelimit) if isnothing(qubit_pair_) - isnothing(prot.retry_lock_time) && error("We do not yet support waiting on register to make qubits available") # TODO - @yield timeout(prot.sim, prot.retry_lock_time) + if isnothing(prot.retry_lock_time) + @debug "SwapperProt: no swappable qubits found. waiting..." + @yield onchange_tag(prot.net[prot.node]) + else + @yield timeout(prot.sim, prot.retry_lock_time) + end continue end # The compiler is not smart enough to figure out that qubit_pair_ is not nothing, so we need to tell it explicitly. A new variable name is needed due to @resumable. diff --git a/src/QuantumSavory.jl b/src/QuantumSavory.jl index aea3770c..5579ca6b 100644 --- a/src/QuantumSavory.jl +++ b/src/QuantumSavory.jl @@ -48,6 +48,7 @@ export CliffordRepr, QuantumOpticsRepr, QuantumMCRepr, UseAsState, UseAsObservable, UseAsOperation, AbstractBackground, + onchange_tag, # networks.jl RegisterNet, channel, qchannel, messagebuffer, # initialize.jl @@ -71,7 +72,6 @@ export # plots.jl registernetplot, registernetplot!, registernetplot_axis, resourceplot_axis - #TODO you can not assume you can always in-place modify a state. Have all these functions work on stateref, not stateref[] # basically all ::QuantumOptics... should be turned into ::Ref{...}... but an abstract ref @@ -79,6 +79,8 @@ include("traits_and_defaults.jl") include("tags.jl") +include("semaphore.jl") + include("states_registers.jl") include("quantumchannel.jl") include("messagebuffer.jl") @@ -116,4 +118,5 @@ include("ProtocolZoo/ProtocolZoo.jl") include("precompile.jl") + end # module diff --git a/src/concurrentsim.jl b/src/concurrentsim.jl index ed96b13f..a7bb375e 100644 --- a/src/concurrentsim.jl +++ b/src/concurrentsim.jl @@ -39,9 +39,11 @@ end ## function get_time_tracker(rn::RegisterNet) + # TODO assert they are all the same return get_time_tracker(rn.registers[1]) end function get_time_tracker(r::Register) + # TODO assert all locks and tag_waiters and similar have the same env r.locks[1].env::Simulation end function get_time_tracker(r::RegRef) diff --git a/src/networks.jl b/src/networks.jl index 620030b8..8b9ff73d 100644 --- a/src/networks.jl +++ b/src/networks.jl @@ -21,6 +21,7 @@ function RegisterNet(graph::SimpleGraph, registers, vertex_metadata, edge_metada if !all_are_same if all_are_at_zero for r in registers + r.tag_waiter[] = AsymmetricSemaphore(env) for i in eachindex(r.locks) r.locks[i] = ConcurrentSim.Resource(env,1) end diff --git a/src/queries.jl b/src/queries.jl index 56aee963..362fd5b1 100644 --- a/src/queries.jl +++ b/src/queries.jl @@ -20,6 +20,7 @@ function tag!(ref::RegRef, tag) id = guid() push!(ref.reg.guids, id) ref.reg.tag_info[id] = (;tag, slot=ref.idx, time=now(get_time_tracker(ref))) + unlock(ref.reg.tag_waiter[]) return id end @@ -41,6 +42,7 @@ function untag!(ref::RegOrRegRef, id::Integer) isnothing(i) ? throw(QueryError("Attempted to delete a nonexistent tag id", untag!, id)) : deleteat!(reg.guids, i) # TODO make sure there is a clear error message to_be_deleted = reg.tag_info[id] delete!(reg.tag_info, id) + unlock(reg.tag_waiter[]) return to_be_deleted end diff --git a/src/semaphore.jl b/src/semaphore.jl new file mode 100644 index 00000000..31effde8 --- /dev/null +++ b/src/semaphore.jl @@ -0,0 +1,29 @@ +using ConcurrentSim +using ResumableFunctions +import Base: unlock, lock + +"""Multiple processes can wait on this semaphore for a permission to run given by another process""" +struct AsymmetricSemaphore + nbwaiters::Ref{Int} + lock::Resource +end +AsymmetricSemaphore(sim) = AsymmetricSemaphore(Ref(0), Resource(sim,1,level=1)) # start locked + +function Base.lock(s::AsymmetricSemaphore) + return @process _lock(s.lock.env, s) +end + +@resumable function _lock(sim, s::AsymmetricSemaphore) + s.nbwaiters[] += 1 + @yield lock(s.lock) + s.nbwaiters[] -= 1 + if s.nbwaiters[] > 0 + unlock(s.lock) + end +end + +function unlock(s::AsymmetricSemaphore) + if s.nbwaiters[] > 0 + unlock(s.lock) + end +end diff --git a/src/states_registers.jl b/src/states_registers.jl index ea61fae5..d11252c1 100644 --- a/src/states_registers.jl +++ b/src/states_registers.jl @@ -1,5 +1,7 @@ +#using QuantumSavory: AsymmetricSemaphore # TODO better constructors # TODO am I overusing Ref + struct StateRef state::Base.RefValue{Any} # TODO it would be nice if this was not abstract but `uptotime!` converts between types... maybe make StateRef{T} state::RefValue{T} and a new function that swaps away the backpointers in the appropriate registers registers::Vector{Any} # TODO Should be Vector{Register}, but right now we occasionally set it to nothing to deal with padded storage @@ -23,11 +25,12 @@ struct Register # TODO better type description tag_info::Dict{Int128, @NamedTuple{tag::Tag, slot::Int, time::Float64}} guids::Vector{Int128} netparent::Ref{Any} + tag_waiter::Ref{AsymmetricSemaphore} # TODO This being a ref is a bit of code smell... but we also want to be able to have registers that are not linked to a net so we need to be able to have this field un-initialized end function Register(traits, reprs, bg, sr, si, at) env = ConcurrentSim.Simulation() - Register(traits, reprs, bg, sr, si, at, [ConcurrentSim.Resource(env) for _ in traits], Dict{Int128, Tuple{Tag, Int64, Float64}}(), [], nothing) + Register(traits, reprs, bg, sr, si, at, [ConcurrentSim.Resource(env) for _ in traits], Dict{Int128, Tuple{Tag, Int64, Float64}}(), [], nothing, Ref(AsymmetricSemaphore(env))) end Register(traits,reprs,bg,sr,si) = Register(traits,reprs,bg,sr,si,zeros(length(traits))) @@ -60,3 +63,8 @@ get_register(r::RegRef) = r.reg get_register(r::Register) = r #Base.:(==)(r1::Register, r2::Register) = + +function onchange_tag(r::RegOrRegRef) + register = get_register(r) + return lock(register.tag_waiter[]) +end \ No newline at end of file diff --git a/test/test_protocolzoo_entanglement_consumer.jl b/test/test_protocolzoo_entanglement_consumer.jl index 673928b9..ac5cd369 100644 --- a/test/test_protocolzoo_entanglement_consumer.jl +++ b/test/test_protocolzoo_entanglement_consumer.jl @@ -2,6 +2,7 @@ using QuantumSavory.ProtocolZoo: EntanglerProt, SwapperProt, EntanglementTracker, EntanglementConsumer using Graphs using ConcurrentSim +using ResumableFunctions if isinteractive() using Logging @@ -41,6 +42,41 @@ for n in 3:30 @test econ.log[i][2] ≈ 1.0 @test econ.log[i][3] ≈ 1.0 end - end + +# test for period=nothing +for n in 3:30 + regsize = 10 + net = RegisterNet([Register(regsize) for j in 1:n]) + sim = get_time_tracker(net) + + @resumable function delayedProts(sim) + @yield timeout(sim, 5) + for e in edges(net) + eprot = EntanglerProt(sim, net, e.src, e.dst; rounds=-1, randomize=true, margin=5, hardmargin=3) + @process eprot() + end + + for v in 2:n-1 + sprot = SwapperProt(sim, net, v; nodeL = <(v), nodeH = >(v), chooseL = argmin, chooseH = argmax, rounds = -1) + @process sprot() + end + + for v in vertices(net) + etracker = EntanglementTracker(sim, net, v) + @process etracker() + end + end + econ = EntanglementConsumer(sim, net, 1, n; period=nothing) + @process econ() + @process delayedProts(sim) + + run(sim, 100) + + @test econ.log[1][1] > 5 # the process should start after 5 + for i in 1:length(econ.log) + @test econ.log[i][2] ≈ 1.0 + @test econ.log[i][3] ≈ 1.0 + end end +end \ No newline at end of file diff --git a/test/test_semaphore.jl b/test/test_semaphore.jl new file mode 100644 index 00000000..658aefb8 --- /dev/null +++ b/test/test_semaphore.jl @@ -0,0 +1,148 @@ +@testitem "AsymmetricSemaphore functionality" tags=[:AsymmetricSemaphore] begin +using QuantumSavory: AsymmetricSemaphore +using ConcurrentSim +using ResumableFunctions +import Base: unlock, lock + +sim = Simulation() +waiter = AsymmetricSemaphore(sim) +log = [] + +@resumable function trigger(sim) + push!(log, (now(sim), "t: start")) + @yield timeout(sim, 10) + push!(log, (now(sim), "t: waited 10")) + unlock(waiter) + push!(log, (now(sim), "t: first trigger")) + @yield timeout(sim, 20) + push!(log, (now(sim), "t: waited 20")) + unlock(waiter) + push!(log, (now(sim), "t: second trigger")) + @yield timeout(sim, 15) + push!(log, (now(sim), "t: waited 15")) + unlock(waiter) + push!(log, (now(sim), "t: third trigger")) + push!(log, (now(sim), "t: end")) +end + +@resumable function proc1(sim) + push!(log, (now(sim), "proc1: start")) + @yield lock(waiter) + push!(log, (now(sim), "proc1: first lock")) + @yield lock(waiter) + push!(log, (now(sim), "proc1: second lock")) + @yield timeout(sim, 10) + push!(log, (now(sim), "proc1: waited 10")) + @yield lock(waiter) + push!(log, (now(sim), "proc1: third lock")) + push!(log, (now(sim), "proc1: end")) +end + +@resumable function proc2(sim) + push!(log, (now(sim), "proc2: start")) + @yield lock(waiter) + push!(log, (now(sim), "proc2: first lock")) + @yield timeout(sim, 25) + push!(log, (now(sim), "proc2: waited 25")) + @yield lock(waiter) + push!(log, (now(sim), "proc2: second lock")) # should be after third trigger + push!(log, (now(sim), "proc2: end")) +end + +@process trigger(sim) +@process proc1(sim) +@process proc2(sim) + +run(sim) + + +expected_log = [ + (0.0, "t: start"), + (0.0, "proc1: start"), + (0.0, "proc2: start"), + (10.0, "t: waited 10"), + (10.0, "t: first trigger"), + (10.0, "proc1: first lock"), + (10.0, "proc2: first lock"), + (30.0, "t: waited 20"), + (30.0, "t: second trigger"), + (30.0, "proc1: second lock"), + (35.0, "proc2: waited 25"), + (40.0, "proc1: waited 10"), + (45.0, "t: waited 15"), + (45.0, "t: third trigger"), + (45.0, "t: end"), + (45.0, "proc2: second lock"), + (45.0, "proc2: end"), + (45.0, "proc1: third lock"), + (45.0, "proc1: end") +] + + +@test length(log) == length(expected_log) + +for i in 1:length(log) + @test log[i] == expected_log[i] +end + +end + +@testset "Multiple AsymmetricSemaphores" begin +sim = Simulation() +waiter1 = AsymmetricSemaphore(sim) +waiter2 = AsymmetricSemaphore(sim) +log = [] + +@resumable function trigger(sim) + push!(log, (now(sim), "t: start")) + @yield timeout(sim, 10) + push!(log, (now(sim), "t: waited 10")) + unlock(waiter1) + push!(log, (now(sim), "t: unlock first waiter")) + @yield timeout(sim, 5) + push!(log, (now(sim), "t: waited 5")) + unlock(waiter2) + push!(log, (now(sim), "t: unlock second waiter")) + push!(log, (now(sim), "t: end")) +end + +@resumable function proc1(sim) + push!(log, (now(sim), "proc1: start")) + @yield lock(waiter1) | lock(waiter2) + push!(log, (now(sim), "proc1: got a lock")) + push!(log, (now(sim), "proc1: end")) +end + +@resumable function proc2(sim) + push!(log, (now(sim), "proc2: start")) + @yield lock(waiter1) & lock(waiter2) + push!(log, (now(sim), "proc2: got both locks")) + push!(log, (now(sim), "proc2: end")) +end +@process trigger(sim) +@process proc1(sim) +@process proc2(sim) + +run(sim) + +expected_log = [ +(0.0, "t: start"), +(0.0, "proc1: start"), +(0.0, "proc2: start"), +(10.0, "t: waited 10"), +(10.0, "t: unlock first waiter"), +(10.0, "proc1: got a lock"), +(10.0, "proc1: end"), +(15.0, "t: waited 5"), +(15.0, "t: unlock second waiter"), +(15.0, "t: end"), +(15.0, "proc2: got both locks"), +(15.0, "proc2: end") +] + +@test length(log) == length(expected_log) + +for i in 1:length(log) + @test log[i] == expected_log[i] +end +end \ No newline at end of file