Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
evetion committed Oct 16, 2024
1 parent 025c445 commit e6b0227
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 96 deletions.
169 changes: 99 additions & 70 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,16 @@ function create_callbacks(
push!(callbacks, basin_cb)

# Update boundary concentrations
tstops = get_tstops(basin.concentration_time.time, starttime)
conc_cb =
PresetTimeCallback(tstops, update_basin_conc!; save_positions = (false, false))
push!(callbacks, conc_cb)

tstops = get_tstops(flow_boundary.concentration_time.time, starttime)
conc_cb =
PresetTimeCallback(tstops, update_flowb_conc!; save_positions = (false, false))
push!(callbacks, conc_cb)

tstops = get_tstops(level_boundary.concentration_time.time, starttime)
conc_cb =
PresetTimeCallback(tstops, update_levelb_conc!; save_positions = (false, false))
push!(callbacks, conc_cb)

tstops = get_tstops(user_demand.concentration_time.time, starttime)
conc_cb =
PresetTimeCallback(tstops, update_userd_conc!; save_positions = (false, false))
push!(callbacks, conc_cb)
for (boundary, func) in (
(basin, update_basin_conc!),
(flow_boundary, update_flowb_conc!),
(level_boundary, update_levelb_conc!),
(user_demand, update_userd_conc!),
)
tstops = get_tstops(boundary.concentration_time.time, starttime)
conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false))
push!(callbacks, conc_cb)
end

# Update TabulatedRatingCurve Q(h) relationships
tstops = get_tstops(tabulated_rating_curve.time.time, starttime)
Expand Down Expand Up @@ -116,6 +107,10 @@ Update with the latest timestep:
- Cumulative flows/forcings which are input for the allocation algorithm
- Cumulative flows/forcings which are realized demands in the allocation context
During these cumulative flow updates, we can also update the mass balance of the system,
as each flow carries mass, based on the concentrations of the flow source.
Specifically, we first use all the inflows to update the mass of the basins, recalculate
the basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, uprev, tprev, dt) = integrator
Expand All @@ -133,7 +128,8 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
# Update tprev
p.tprev[] = t

# Reset cumulative flows
# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
fill!(basin.cumulative_in, 0.0)

# Update cumulative forcings which are integrated exactly
Expand Down Expand Up @@ -194,95 +190,128 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
end
end

# Placeholder concentration for Terminals and the like
# TODO Move to Terminal(?!)
placeholder = zeros(length(basin.substances))
placeholder[1] = 1.0 # Continuity
# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_flow = inflow_edge.edge[1]
to_flow = outflow_edge.edge[2]
if from_flow.type == NodeType.Basin
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_flow.idx] -= flow
if to_flow.type == NodeType.Basin
basin.mass[from_flow.idx, :] .-=
basin.concentration_state[to_flow.idx, :] .* flow
elseif to_flow.type == NodeType.LevelBoundary
basin.mass[from_flow.idx, :] .-=
level_boundary.concentration[to_flow.idx, :] .* flow
elseif to_flow.type == NodeType.UserDemand
basin.mass[from_flow.idx, :] .-=
user_demand.concentration[to_flow.idx, :] .* flow
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=
user_demand.concentration[to_node.idx, :] .* flow
else
# Fix mass balance, even though Terminals should not leak
basin.mass[from_flow.idx, :] .-= placeholder .* flow
@warn "Unsupported outflow type $(to_flow.type) #$(to_flow.value) with flow $flow"
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end

if to_flow.type == NodeType.Basin
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_flow.idx] += flow
if from_flow.type == NodeType.Basin
basin.mass[to_flow.idx, :] .+=
basin.concentration_state[from_flow.idx, :] .* flow
elseif from_flow.type == NodeType.LevelBoundary
basin.mass[to_flow.idx, :] .+=
level_boundary.concentration[from_flow.idx, :] .* flow
elseif from_flow.type == NodeType.UserDemand
basin.mass[to_flow.idx, :] .+=
user_demand.concentration[from_flow.idx, :] .* flow
elseif from_flow.type == NodeType.Terminal
# TODO Apparently UserDemand outflow is discoupled from it's inflow
# and its inflow_edge is Terminal #0 twice
basin.mass[to_flow.idx, :] .+=
user_demand.concentration[outflow_edge.edge[1].idx, :] .* flow
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported inflow type $(from_flow.type)"
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"
end
end
end
end

# Update the basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_flow = inflow_edge.edge[1]
to_flow = outflow_edge.edge[2]
if from_flow.type == NodeType.Basin
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_flow.idx, :] .-=
basin.concentration_state[from_flow.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end

if to_flow.type == NodeType.Basin
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_flow.idx, :] .+=
basin.concentration_state[to_flow.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
end
end

# We might want to evaporate mass to keep the mass balance.
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of masses getting ever smaller, possibly becoming negative
# TODO Should this be bounded by the solver tolerances?
# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (0 - eps(Float64)) < basin.mass[I] < (0 + eps(Float64))
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
end
end
any(<(0), basin.mass) && error("Negative mass detected: $(basin.mass)")

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
end
error("Negative mass(es) detected")
end

# Update the basin concentrations again based on the removed mass
basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)]
basin.storage_prev .= basin.current_storage[parent(u)]

Expand Down
28 changes: 24 additions & 4 deletions core/src/parameter.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const SolverStats = @NamedTuple{
@enumx EdgeType flow control none
@eval @enumx NodeType $(config.nodetypes...)
@enumx ContinuousControlType None Continuous PID
@enumx Substance Continuity = 1 Initial = 2 LevelBoundary = 3 FlowBoundary = 4 UserDemand =
5 Drainage = 6 Precipitation = 7
Base.to_index(id::Substance.T) = Int(id) # used to index into concentration matrices

# Support creating a NodeType enum instance from a symbol or string
function NodeType.T(s::Symbol)::NodeType.T
Expand Down Expand Up @@ -268,6 +271,7 @@ In-memory storage of saved mean flows for writing to results.
- `flow_boundary`: The exact integrated mean flows of flow boundaries
- `precipitation`: The exact integrated mean precipitation
- `drainage`: The exact integrated mean drainage
- `concentration`: Concentrations for each basin and substance
- `balance_error`: The (absolute) water balance error
- `relative_error`: The relative water balance error
- `t`: Endtime of the interval over which is averaged
Expand Down Expand Up @@ -346,17 +350,27 @@ end
demand::Vector{Float64}
# Data source for parameter updates
time::StructVector{BasinTimeV1, C, Int}
# Data source for concentration updates
concentration_time::StructVector{BasinConcentrationV1, D, Int}

# Concentrations
# Config setting to enable/disable evaporation of mass
evaporate_mass::Bool = true
# Cumulative inflow for each basin at a given time
cumulative_in::Vector{Float64} = zeros(length(node_id))
# Storage for each basin at the previous time step
storage_prev::Vector{Float64} = zeros(length(node_id))
# matrix with concentrations for each basin and substance
concentration_state::Matrix{Float64} # basin, substance
concentration::Array{Float64, 3} # boundary, basin, substance
mass::Matrix{Float64} # basin, substance
# matrix with boundary concentrations for each boundary, basin and substance
concentration::Array{Float64, 3}
# matrix with mass for each basin and substance
mass::Matrix{Float64}
# substances in use by the model (ordered like their axis in the concentration matrices)
substances::OrderedSet{Symbol}
# Data source for external concentrations (used in control)
concentration_external::Vector{Dict{String, ScalarInterpolation}} =
Dict{String, ScalarInterpolation}[]
substances::OrderedSet{Symbol}
end

"""
Expand Down Expand Up @@ -469,12 +483,14 @@ end
node_id: node ID of the LevelBoundary node
active: whether this node is active
level: the fixed level of this 'infinitely big basin'
concentration: matrix with boundary concentrations for each basin and substance
concentration_time: Data source for concentration updates
"""
@kwdef struct LevelBoundary{C} <: AbstractParameterNode
node_id::Vector{NodeID}
active::Vector{Bool}
level::Vector{ScalarInterpolation}
concentration::Matrix{Float64} = Matrix{Float64}()
concentration::Matrix{Float64}
concentration_time::StructVector{LevelBoundaryConcentrationV1, C, Int}
end

Expand All @@ -485,6 +501,8 @@ active: whether this node is active and thus contributes flow
cumulative_flow: The exactly integrated cumulative boundary flow since the start of the simulation
cumulative_flow_saveat: The exactly integrated cumulative boundary flow since the last saveat
flow_rate: flow rate (exact)
concentration: matrix with boundary concentrations for each basin and substance
concentration_time: Data source for concentration updates
"""
@kwdef struct FlowBoundary{C} <: AbstractParameterNode
node_id::Vector{NodeID}
Expand Down Expand Up @@ -759,6 +777,8 @@ demand_from_timeseries: If false the demand comes from the BMI or is fixed
allocated: water flux currently allocated to UserDemand per priority (node_idx, priority_idx)
return_factor: the factor in [0,1] of how much of the abstracted water is given back to the system
min_level: The level of the source basin below which the UserDemand does not abstract
concentration: matrix with boundary concentrations for each basin and substance
concentration_time: Data source for concentration updates
"""
@kwdef struct UserDemand{C} <: AbstractDemandNode
node_id::Vector{NodeID}
Expand Down
Loading

0 comments on commit e6b0227

Please sign in to comment.