Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve_ifm #23

Merged
merged 14 commits into from
Aug 21, 2024
1 change: 1 addition & 0 deletions src/abstract_types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ abstract type AbstractScenarioModellingMethod end
Interface:
TuLiPa.assemble!(m::AbstractInflowModel)
TuLiPa.getid(m::AbstractInflowModel) -> TuLiPa.Id
numstates(m::AbstractInflowModel) -> Int ( == length(u0))
estimate_u0(m::AbstractInflowModel, t::ProbTime) -> u0::Vector{Float64}
predict(m::AbstractInflowModel, u0::::Vector{Float64}, t::ProbTime) -> Q::Vector{Float64}
"""
Expand Down
67 changes: 65 additions & 2 deletions src/ifm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ end

TuLiPa.getid(m::TwoStateBucketIfm) = m.id
TuLiPa.assemble!(m::TwoStateBucketIfm) = true
numstates(::TwoStateBucketIfm) = 2
estimate_u0(m::TwoStateBucketIfm, t::TuLiPa.ProbTime) = estimate_u0(m.handler, t)
predict(m::TwoStateBucketIfm, u0::Vector{Float64}, t::TuLiPa.ProbTime) = predict(m.handler, u0, t)

Expand Down Expand Up @@ -297,7 +298,7 @@ end

TuLiPa.getid(m::TwoStateNeuralODEIfm) = m.id
TuLiPa.assemble!(m::TwoStateNeuralODEIfm) = true

numstates(::TwoStateNeuralODEIfm) = 2
estimate_u0(m::TwoStateNeuralODEIfm, t::TuLiPa.ProbTime) = estimate_u0(m.handler, t)
predict(m::TwoStateNeuralODEIfm, u0::Vector{Float64}, t::TuLiPa.ProbTime) = predict(m.handler, u0, t)

Expand Down Expand Up @@ -423,11 +424,20 @@ end

# --- Functions used in run_serial in connection with inflow models ---

const IFM_DB_STATE_KEY = "ifm_step_u0"
const IFM_DB_FLOW_KEY = "ifm_step_Q"

"""
Create inflow models and store some of them locally according to db.dist_ifm
"""
function create_ifm()
db = get_local_db()

@assert !haskey(db.div, IFM_DB_STATE_KEY)
db.div[IFM_DB_STATE_KEY] = Dict{String, Tuple{Int, Vector{Float64}}}()
@assert !haskey(db.div, IFM_DB_FLOW_KEY)
db.div[IFM_DB_FLOW_KEY] = Dict{String, Tuple{Int, Float64}}()

elements = get_ifm_elements(db)
t0 = time()
modelobjects = TuLiPa.getmodelobjects(elements)
Expand All @@ -442,12 +452,45 @@ function create_ifm()
end
end

function save_ifm_u0(db, inflow_name, stepnr, u0)
if !haskey(db.div[IFM_DB_STATE_KEY], inflow_name)
db.div[IFM_DB_STATE_KEY][inflow_name] = (stepnr, u0)
else
(stored_stepnr, __) = db.div[IFM_DB_STATE_KEY][inflow_name]
if stored_stepnr != stepnr
db.div[IFM_DB_STATE_KEY][inflow_name] = (stepnr, u0)
end
end
end

function save_ifm_Q(db, inflow_name, stepnr, Q)
if !haskey(db.div[IFM_DB_FLOW_KEY], inflow_name)
db.div[IFM_DB_FLOW_KEY][inflow_name] = (stepnr, Q)
else
(stored_stepnr, __) = db.div[IFM_DB_FLOW_KEY][inflow_name]
if stored_stepnr != stepnr
db.div[IFM_DB_FLOW_KEY][inflow_name] = (stepnr, Q)
end
end
end

"""
Sequentially solve inflow models stored locally.
Each inflow model is solved for each scenario.
"""
function solve_ifm(t)
function solve_ifm(t, stepnr)
db = get_local_db()

steplen_ms = Millisecond(get_steplength(db.input))
ifmstep_ms = TuLiPa.getduration(ONEDAY_MS_TIMEDELTA)
@assert steplen_ms >= ifmstep_ms
nifmsteps = div(steplen_ms.value, ifmstep_ms.value)
remainder_ms = steplen_ms - ifmstep_ms * nifmsteps
steplen_f = float(steplen_ms.value)
ifmstep_f = float(ifmstep_ms.value)
remainder_f = float(remainder_ms.value)


normfactors = get_ifm_normfactors(db)
scenarios = get_scenarios(db.scenmod_sim)
for (inflow_name, core) in db.dist_ifm
Expand All @@ -458,6 +501,26 @@ function solve_ifm(t)
inflow_model = db.ifm[inflow_name]
normalize_factor = normfactors[inflow_name]
u0 = estimate_u0(inflow_model, t)

# save in familiar unit
# TODO: extend interface with get_basin_area(::AbstractInflowModel) ?
u0_mm3 = u0 .* inflow_model.handler.basin_area ./ 1000.0
save_ifm_u0(db, inflow_name, stepnr, u0_mm3)

# predict mean Q for over clearing period and store result
# can be used to measure goodness of ifm model
Q = predict(inflow_model, u0, t)
@assert nifmsteps <= length(Q)
mean_Q = 0.0
for i in 1:nifmsteps
mean_Q += Q[i] * ifmstep_f
end
if nifmsteps > 0
mean_Q += Q[nifmsteps] * remainder_f
end
mean_Q /= steplen_f
save_ifm_Q(db, inflow_name, stepnr, mean_Q)

for (scenix, scen) in enumerate(scenarios)
scentime = get_scentphasein(t, scen, db.input)
Q = predict(inflow_model, u0, scentime)
Expand Down
106 changes: 103 additions & 3 deletions src/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ function get_distribution_method_sp(input::DefaultJulESInput, default::String="w
end

get_iprogtype(input::DefaultJulESInput) = get(input.dataset, "iprogtype", "direct")
has_ifm_results(input::DefaultJulESInput) = get_iprogtype(input) != "direct"
get_ifm_normfactors(input::DefaultJulESInput) = get(input.dataset, "ifm_normfactors", Dict{String, Float64}())
get_ifm_elements(input::DefaultJulESInput) = get(input.dataset, "ifm_elements", JulES.TuLiPa.DataElement[])

Expand Down Expand Up @@ -613,13 +614,18 @@ mutable struct DefaultJulESOutput <: AbstractJulESOutput
statenames::Vector{String}
statematrix::Array{Float64} # end states after each step

ifm_stations::Vector{String}
ifm_u0::Vector{Matrix{Float64}}
ifm_Q::Matrix{Float64}

function DefaultJulESOutput(input)
return new(Dict(),Dict(),Dict(),Dict(),[],
Dict(),
[],[],[],[],[],[],[],
[],[],
[],[],[],[],[],[],
Dict(),[],[],[],[],[],Dict(),[],[],Dict(),[],[],[],[])
Dict(),[],[],[],[],[],Dict(),[],[],Dict(),[],[],[],[],
[], [], Matrix{Float64}(undef, (0,0)))
end
end

Expand Down Expand Up @@ -709,6 +715,22 @@ function init_local_output()
@spawnat core init_prices_ppp(scenix, num_balances, numperiods_long, numperiods_med, numperiods_short)
end
end

if has_ifm_results(db.input)
db.output.ifm_stations = collect(get_ifm_names(db.input))
num_states = get_ifm_numstates()
num_stations = length(db.output.ifm_stations)
@assert length(db.output.ifm_u0) == 0
for __ in 1:num_states
push!(db.output.ifm_u0, zeros(Float64, (num_stations, steps)))
end
db.output.ifm_Q = zeros(Float64, (num_stations, steps))
end
end

function get_ifm_numstates()
# TODO: find common numstates by calling numstates on each ifm and verify all ifm of same type
return 2
end

function init_prices_ppp(scenix, num_balances, numperiods_long, numperiods_med, numperiods_short)
Expand All @@ -729,11 +751,82 @@ function get_numstates(subix)
return length(db.mp[subix].states)
end

function collect_ifm_u0(stepnr)
db = get_local_db()
d = Dict{String, Vector{Float64}}()
for core in get_cores(db.input)
fetched = fetch(@spawnat core local_collect_ifm_u0(stepnr))
if fetched isa RemoteException
throw(fetched)
end
for (name, x) in fetched
@assert !haskey(d, name)
d[name] = x
end
end
return d
end

function local_collect_ifm_u0(stepnr)
db = get_local_db()
d = Dict{String, Vector{Float64}}()
for (name, core) in db.dist_ifm
if core == db.core
(stored_stepnr, u0) = db.div[IFM_DB_STATE_KEY][name]
@assert stored_stepnr == stepnr
d[name] = u0
end
end
return d
end

function collect_ifm_Q(stepnr)
db = get_local_db()
d = Dict{String, Float64}()
for core in get_cores(db.input)
fetched = fetch(@spawnat core local_collect_ifm_Q(stepnr))
if fetched isa RemoteException
throw(fetched)
end
for (name, x) in fetched
@assert !haskey(d, name)
d[name] = x
end
end
return d
end

function local_collect_ifm_Q(stepnr)
db = get_local_db()
d = Dict{String, Float64}()
for (name, core) in db.dist_ifm
if core == db.core
(stored_stepnr, Q) = db.div[IFM_DB_FLOW_KEY][name]
@assert stored_stepnr == stepnr
d[name] = Q
end
end
return d
end

function update_output(t::TuLiPa.ProbTime, stepnr::Int)
db = get_local_db()
settings = get_settings(db)
steps = get_steps(db)

if has_ifm_results(db.input)
u0 = collect_ifm_u0(stepnr)
for (i, station) in enumerate(db.output.ifm_stations)
for (j, v) in enumerate(u0[station])
db.output.ifm_u0[j][i, stepnr] = v
end
end
ifm_Q = collect_ifm_Q(stepnr)
for (i, station) in enumerate(db.output.ifm_stations)
db.output.ifm_Q[i, stepnr] = ifm_Q[station]
end
end

if has_result_times(settings)
for (scenix, core) in db.dist_ppp
f = @spawnat core get_maintiming_ppp(scenix)
Expand All @@ -758,7 +851,7 @@ function update_output(t::TuLiPa.ProbTime, stepnr::Int)
db.output.timing_sp[(scenix, subix)][stepnr, :] .= fetch(f)
@spawnat core reset_maintiming_sp(scenix, subix)
end
end
end

if has_result_scenarios(settings)
db.output.scenweights_sim[stepnr, :] .= [get_probability(scen) for scen in get_scenarios(db.scenmod_sim)]
Expand Down Expand Up @@ -1446,7 +1539,14 @@ function get_output_main_local()
data["demandnames"] = demandnames
data["demandbalancenames"] = demandbalancenames

#data[]
if has_ifm_results(db.input)
data["ifm_names"] = db.output.ifm_stations
data["ifm_index"] = x3
for stateix in eachindex(db.output.ifm_u0)
data["ifm_startstates_$(stateix)"] = db.output.ifm_u0[stateix]
end
data["ifm_steamflow"] = db.output.ifm_Q
end
end

return data
Expand Down
2 changes: 1 addition & 1 deletion src/run_serial.jl
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ function step_jules(t, steplength, stepnr, skipmed)
println("Solve inflow models")
@time begin
@sync for core in cores
@spawnat core solve_ifm(t)
@spawnat core solve_ifm(t, stepnr)
end

@sync for core in cores
Expand Down
Loading