Skip to content

Commit

Permalink
Merge branch 'main' into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
SouthEndMusic committed Oct 21, 2024
2 parents d6ca23b + 82a4a74 commit 4b653a2
Show file tree
Hide file tree
Showing 25 changed files with 731 additions and 65 deletions.
3 changes: 3 additions & 0 deletions core/src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ using Tables: Tables, AbstractRow, columntable
# Wrapper around a vector of structs to easily retrieve the same field from all elements.
using StructArrays: StructVector

# OrderedSet is used to store the order of the substances in the network.
using DataStructures: OrderedSet

export libribasim

include("schema.jl")
Expand Down
241 changes: 233 additions & 8 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ function create_callbacks(
u0::ComponentVector,
saveat,
)::Tuple{CallbackSet, SavedResults}
(; starttime, basin, tabulated_rating_curve) = parameters
(;
starttime,
basin,
flow_boundary,
level_boundary,
user_demand,
tabulated_rating_curve,
) = parameters
callbacks = SciMLBase.DECallback[]

# Check for negative storage
Expand All @@ -32,6 +39,18 @@ function create_callbacks(
basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false))
push!(callbacks, basin_cb)

# Update boundary concentrations
for (boundary, func) in (
(basin, update_basin_conc!),
(flow_boundary, update_flowb_conc!),
(level_boundary, update_levelb_conc!),
(user_demand, update_userd_conc!),
)
tstops = get_tstops(boundary.concentration_time.time, starttime)
conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false))
push!(callbacks, conc_cb)
end

# Update TabulatedRatingCurve Q(h) relationships
tstops = get_tstops(tabulated_rating_curve.time.time, starttime)
tabulated_rating_curve_cb = PresetTimeCallback(
Expand Down Expand Up @@ -88,18 +107,36 @@ Update with the latest timestep:
- Cumulative flows/forcings which are input for the allocation algorithm
- Cumulative flows/forcings which are realized demands in the allocation context
During these cumulative flow updates, we can also update the mass balance of the system,
as each flow carries mass, based on the concentrations of the flow source.
Specifically, we first use all the inflows to update the mass of the Basins, recalculate
the Basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, tprev, dt) = integrator
(; basin, flow_boundary, allocation) = p
(; p, uprev, tprev, dt) = integrator
(;
basin,
state_inflow_edge,
state_outflow_edge,
user_demand,
level_boundary,
flow_boundary,
allocation,
) = p
(; vertical_flux) = basin

# Update tprev
p.tprev[] = t

# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
basin.cumulative_in .= 0.0

# Update cumulative forcings which are integrated exactly
@. basin.cumulative_drainage += vertical_flux.drainage * dt
@. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt
basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt
basin.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
Expand All @@ -108,15 +145,26 @@ function update_cumulative_flows!(u, t, integrator)::Nothing

basin.cumulative_precipitation[node_id.idx] += added_precipitation
basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation
basin.mass[node_id.idx, :] .+=
basin.concentration[2, node_id.idx, :] .* added_precipitation
basin.cumulative_in[node_id.idx] += added_precipitation
end

# Exact boundary flow over time step
for (id, flow_rate, active) in
zip(flow_boundary.node_id, flow_boundary.flow_rate, flow_boundary.active)
for (id, flow_rate, active, edge) in zip(
flow_boundary.node_id,
flow_boundary.flow_rate,
flow_boundary.active,
flow_boundary.outflow_edges,
)
if active
outflow_id = edge[1].edge[2]
volume = integral(flow_rate, tprev, t)
flow_boundary.cumulative_flow[id.idx] += volume
flow_boundary.cumulative_flow_saveat[id.idx] += volume
basin.mass[outflow_id.idx, :] .+=
flow_boundary.concentration[id.idx, :] .* volume
basin.cumulative_in[outflow_id.idx] += volume
end
end

Expand All @@ -141,13 +189,139 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
end
end
end

# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=
user_demand.concentration[to_node.idx, :] .* flow
else
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end

if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"
end
end
end
end

# Update the Basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from Basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
end
end

# Evaporate mass to keep the mass balance, if enabled in model config
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
end
end

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
end
error("Negative mass(es) detected")
end

# Update the Basin concentrations again based on the removed mass
basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)]
basin.storage_prev .= basin.current_storage[parent(u)]

return nothing
end

"""
Given an edge (from_id, to_id), compute the cumulative flow over that
edge over the latest timestep. If from_id and to_id are both the same basin,
the function returns the sum of the basin forcings.
edge over the latest timestep. If from_id and to_id are both the same Basin,
the function returns the sum of the Basin forcings.
"""
function flow_update_on_edge(
integrator::DEIntegrator,
Expand Down Expand Up @@ -192,7 +366,7 @@ end
"""
Save all cumulative forcings and flows over edges over the latest timestep,
Both computed by the solver and integrated exactly. Also computes the total horizontal
inflow and outflow per basin.
inflow and outflow per Basin.
"""
function save_flow(u, t, integrator)
(; p) = integrator
Expand Down Expand Up @@ -247,13 +421,15 @@ function save_flow(u, t, integrator)
@. basin.cumulative_precipitation_saveat = 0.0
@. basin.cumulative_drainage_saveat = 0.0

concentration = copy(basin.concentration_state)
saved_flow = SavedFlow(;
flow = flow_mean,
inflow = inflow_mean,
outflow = outflow_mean,
flow_boundary = flow_boundary_mean,
precipitation,
drainage,
concentration,
t,
)
check_water_balance_error!(saved_flow, integrator, Δt)
Expand Down Expand Up @@ -500,6 +676,10 @@ function get_value(subvariable::NamedTuple, p::Parameters, du::AbstractVector, t

elseif startswith(variable, "concentration_external.")
value = basin.concentration_external[listen_node_id.idx][variable](t)
elseif startswith(variable, "concentration.")
substance = Symbol(last(split(variable, ".")))
var_idx = findfirst(==(substance), basin.substances)
value = basin.concentration_state[listen_node_id.idx, var_idx]
else
error("Unsupported condition variable $variable.")
end
Expand Down Expand Up @@ -590,6 +770,51 @@ function update_basin!(integrator)::Nothing
return nothing
end

"Load updates from 'Basin / concentration' into the parameters"
function update_basin_conc!(integrator)::Nothing
(; p) = integrator
(; basin) = p
(; node_id, concentration, concentration_time, substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(NodeType.Basin, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.drainage) || (concentration[1, i, j] = row.drainage)
ismissing(row.precipitation) || (concentration[2, i, j] = row.precipitation)
end
return nothing
end

"Load updates from 'concentration' tables into the parameters"
function update_conc!(integrator, parameter, nodetype)::Nothing
(; p) = integrator
node = getproperty(p, parameter)
(; basin) = p
(; node_id, concentration, concentration_time) = node
(; substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(nodetype, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.concentration) || (concentration[i, j] = row.concentration)
end
return nothing
end
update_flowb_conc!(integrator)::Nothing =
update_conc!(integrator, :flow_boundary, NodeType.FlowBoundary)
update_levelb_conc!(integrator)::Nothing =
update_conc!(integrator, :level_boundary, NodeType.LevelBoundary)
update_userd_conc!(integrator)::Nothing =
update_conc!(integrator, :user_demand, NodeType.UserDemand)

"Solve the allocation problem for all demands and assign allocated abstractions."
function update_allocation!(integrator)::Nothing
(; p, t, u) = integrator
Expand Down
1 change: 1 addition & 0 deletions core/src/config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const nodetypes = collect(keys(nodekinds))
maxiters::Int = 1e9
sparse::Bool = true
autodiff::Bool = false
evaporate_mass::Bool = true
end

# Separate struct, as basin clashes with nodetype
Expand Down
2 changes: 1 addition & 1 deletion core/src/main.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function main(toml_path::AbstractString)::Cint
if config.ribasim_version != cli.ribasim_version
@warn "The Ribasim version in the TOML config file does not match the used Ribasim CLI version." config.ribasim_version cli.ribasim_version
end
@info "Starting a Ribasim simulation." cli.ribasim_version starttime endtime
@info "Starting a Ribasim simulation." toml_path cli.ribasim_version starttime endtime

try
model = run(config)
Expand Down
Loading

0 comments on commit 4b653a2

Please sign in to comment.