diff --git a/core/src/callback.jl b/core/src/callback.jl index 5996e4c02..999675262 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -40,25 +40,16 @@ function create_callbacks( push!(callbacks, basin_cb) # Update boundary concentrations - tstops = get_tstops(basin.concentration_time.time, starttime) - conc_cb = - PresetTimeCallback(tstops, update_basin_conc!; save_positions = (false, false)) - push!(callbacks, conc_cb) - - tstops = get_tstops(flow_boundary.concentration_time.time, starttime) - conc_cb = - PresetTimeCallback(tstops, update_flowb_conc!; save_positions = (false, false)) - push!(callbacks, conc_cb) - - tstops = get_tstops(level_boundary.concentration_time.time, starttime) - conc_cb = - PresetTimeCallback(tstops, update_levelb_conc!; save_positions = (false, false)) - push!(callbacks, conc_cb) - - tstops = get_tstops(user_demand.concentration_time.time, starttime) - conc_cb = - PresetTimeCallback(tstops, update_userd_conc!; save_positions = (false, false)) - push!(callbacks, conc_cb) + for (boundary, func) in ( + (basin, update_basin_conc!), + (flow_boundary, update_flowb_conc!), + (level_boundary, update_levelb_conc!), + (user_demand, update_userd_conc!), + ) + tstops = get_tstops(boundary.concentration_time.time, starttime) + conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false)) + push!(callbacks, conc_cb) + end # Update TabulatedRatingCurve Q(h) relationships tstops = get_tstops(tabulated_rating_curve.time.time, starttime) @@ -116,6 +107,10 @@ Update with the latest timestep: - Cumulative flows/forcings which are input for the allocation algorithm - Cumulative flows/forcings which are realized demands in the allocation context +During these cumulative flow updates, we can also update the mass balance of the system, +as each flow carries mass, based on the concentrations of the flow source. +Specifically, we first use all the inflows to update the mass of the basins, recalculate +the basin concentration(s) and then remove the mass that is being lost to the outflows. """ function update_cumulative_flows!(u, t, integrator)::Nothing (; p, uprev, tprev, dt) = integrator @@ -133,7 +128,8 @@ function update_cumulative_flows!(u, t, integrator)::Nothing # Update tprev p.tprev[] = t - # Reset cumulative flows + # Reset cumulative flows, used to calculate the concentration + # of the basins after processing inflows only fill!(basin.cumulative_in, 0.0) # Update cumulative forcings which are integrated exactly @@ -194,95 +190,128 @@ function update_cumulative_flows!(u, t, integrator)::Nothing end end - # Placeholder concentration for Terminals and the like - # TODO Move to Terminal(?!) - placeholder = zeros(length(basin.substances)) - placeholder[1] = 1.0 # Continuity + # Process mass updates for UserDemand separately + # as the inflow and outflow are decoupled in the states + for (inflow_edge, outflow_edge) in + zip(user_demand.inflow_edge, user_demand.outflow_edge) + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + userdemand_idx = outflow_edge.edge[1].idx + if from_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, inflow_edge.edge) + if flow < 0 + basin.mass[from_node.idx, :] .-= + basin.concentration_state[to_node.idx, :] .* flow + basin.mass[from_node.idx, :] .-= + user_demand.concentration[userdemand_idx, :] .* flow + end + end + if to_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, outflow_edge.edge) + if flow > 0 + basin.mass[to_node.idx, :] .+= + basin.concentration_state[from_node.idx, :] .* flow + basin.mass[to_node.idx, :] .+= + user_demand.concentration[userdemand_idx, :] .* flow + end + end + end + # Process all mass inflows to basins for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge) - from_flow = inflow_edge.edge[1] - to_flow = outflow_edge.edge[2] - if from_flow.type == NodeType.Basin + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + if from_node.type == NodeType.Basin flow = flow_update_on_edge(integrator, inflow_edge.edge) if flow < 0 - basin.cumulative_in[from_flow.idx] -= flow - if to_flow.type == NodeType.Basin - basin.mass[from_flow.idx, :] .-= - basin.concentration_state[to_flow.idx, :] .* flow - elseif to_flow.type == NodeType.LevelBoundary - basin.mass[from_flow.idx, :] .-= - level_boundary.concentration[to_flow.idx, :] .* flow - elseif to_flow.type == NodeType.UserDemand - basin.mass[from_flow.idx, :] .-= - user_demand.concentration[to_flow.idx, :] .* flow + basin.cumulative_in[from_node.idx] -= flow + if to_node.type == NodeType.Basin + basin.mass[from_node.idx, :] .-= + basin.concentration_state[to_node.idx, :] .* flow + elseif to_node.type == NodeType.LevelBoundary + basin.mass[from_node.idx, :] .-= + level_boundary.concentration[to_node.idx, :] .* flow + elseif to_node.type == NodeType.UserDemand + basin.mass[from_node.idx, :] .-= + user_demand.concentration[to_node.idx, :] .* flow else - # Fix mass balance, even though Terminals should not leak - basin.mass[from_flow.idx, :] .-= placeholder .* flow - @warn "Unsupported outflow type $(to_flow.type) #$(to_flow.value) with flow $flow" + @warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow" end end end - if to_flow.type == NodeType.Basin + if to_node.type == NodeType.Basin flow = flow_update_on_edge(integrator, outflow_edge.edge) if flow > 0 - basin.cumulative_in[to_flow.idx] += flow - if from_flow.type == NodeType.Basin - basin.mass[to_flow.idx, :] .+= - basin.concentration_state[from_flow.idx, :] .* flow - elseif from_flow.type == NodeType.LevelBoundary - basin.mass[to_flow.idx, :] .+= - level_boundary.concentration[from_flow.idx, :] .* flow - elseif from_flow.type == NodeType.UserDemand - basin.mass[to_flow.idx, :] .+= - user_demand.concentration[from_flow.idx, :] .* flow - elseif from_flow.type == NodeType.Terminal - # TODO Apparently UserDemand outflow is discoupled from it's inflow - # and its inflow_edge is Terminal #0 twice - basin.mass[to_flow.idx, :] .+= - user_demand.concentration[outflow_edge.edge[1].idx, :] .* flow + basin.cumulative_in[to_node.idx] += flow + if from_node.type == NodeType.Basin + basin.mass[to_node.idx, :] .+= + basin.concentration_state[from_node.idx, :] .* flow + elseif from_node.type == NodeType.LevelBoundary + basin.mass[to_node.idx, :] .+= + level_boundary.concentration[from_node.idx, :] .* flow + elseif from_node.type == NodeType.UserDemand + basin.mass[to_node.idx, :] .+= + user_demand.concentration[from_node.idx, :] .* flow + elseif from_node.type == NodeType.Terminal && from_node.value == 0 + # UserDemand outflow is discoupled from its inflow, + # and the unset flow edge defaults to Terminal #0 + nothing else - @warn "Unsupported inflow type $(from_flow.type)" + @warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow" end end end end + # Update the basin concentrations based on the added mass and flows basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in) + # Process all mass outflows from basins for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge) - from_flow = inflow_edge.edge[1] - to_flow = outflow_edge.edge[2] - if from_flow.type == NodeType.Basin + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + if from_node.type == NodeType.Basin flow = flow_update_on_edge(integrator, inflow_edge.edge) if flow > 0 - basin.mass[from_flow.idx, :] .-= - basin.concentration_state[from_flow.idx, :] .* flow + basin.mass[from_node.idx, :] .-= + basin.concentration_state[from_node.idx, :] .* flow end end - - if to_flow.type == NodeType.Basin + if to_node.type == NodeType.Basin flow = flow_update_on_edge(integrator, outflow_edge.edge) if flow < 0 - basin.mass[to_flow.idx, :] .+= - basin.concentration_state[to_flow.idx, :] .* flow + basin.mass[to_node.idx, :] .+= + basin.concentration_state[to_node.idx, :] .* flow end end end + # We might want to evaporate mass to keep the mass balance. if basin.evaporate_mass basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation) end basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration) - # Take care of masses getting ever smaller, possibly becoming negative - # TODO Should this be bounded by the solver tolerances? + # Take care of infinitely small masses, possibly becoming negative due to truncation. for I in eachindex(basin.mass) - if (0 - eps(Float64)) < basin.mass[I] < (0 + eps(Float64)) + if (-eps(Float64)) < basin.mass[I] < (eps(Float64)) basin.mass[I] = 0.0 end end - any(<(0), basin.mass) && error("Negative mass detected: $(basin.mass)") + + # Check for negative masses + if any(<(0), basin.mass) + R = CartesianIndices(basin.mass) + locations = findall(<(0), basin.mass) + for I in locations + basin_idx, substance_idx = Tuple(R[I]) + @error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])" + end + error("Negative mass(es) detected") + end + + # Update the basin concentrations again based on the removed mass basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)] basin.storage_prev .= basin.current_storage[parent(u)] diff --git a/core/src/parameter.jl b/core/src/parameter.jl index 2319c3e76..075764e6d 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -10,6 +10,9 @@ const SolverStats = @NamedTuple{ @enumx EdgeType flow control none @eval @enumx NodeType $(config.nodetypes...) @enumx ContinuousControlType None Continuous PID +@enumx Substance Continuity = 1 Initial = 2 LevelBoundary = 3 FlowBoundary = 4 UserDemand = + 5 Drainage = 6 Precipitation = 7 +Base.to_index(id::Substance.T) = Int(id) # used to index into concentration matrices # Support creating a NodeType enum instance from a symbol or string function NodeType.T(s::Symbol)::NodeType.T @@ -268,6 +271,7 @@ In-memory storage of saved mean flows for writing to results. - `flow_boundary`: The exact integrated mean flows of flow boundaries - `precipitation`: The exact integrated mean precipitation - `drainage`: The exact integrated mean drainage +- `concentration`: Concentrations for each basin and substance - `balance_error`: The (absolute) water balance error - `relative_error`: The relative water balance error - `t`: Endtime of the interval over which is averaged @@ -346,17 +350,27 @@ end demand::Vector{Float64} # Data source for parameter updates time::StructVector{BasinTimeV1, C, Int} + # Data source for concentration updates concentration_time::StructVector{BasinConcentrationV1, D, Int} + # Concentrations + # Config setting to enable/disable evaporation of mass evaporate_mass::Bool = true + # Cumulative inflow for each basin at a given time cumulative_in::Vector{Float64} = zeros(length(node_id)) + # Storage for each basin at the previous time step storage_prev::Vector{Float64} = zeros(length(node_id)) + # matrix with concentrations for each basin and substance concentration_state::Matrix{Float64} # basin, substance - concentration::Array{Float64, 3} # boundary, basin, substance - mass::Matrix{Float64} # basin, substance + # matrix with boundary concentrations for each boundary, basin and substance + concentration::Array{Float64, 3} + # matrix with mass for each basin and substance + mass::Matrix{Float64} + # substances in use by the model (ordered like their axis in the concentration matrices) + substances::OrderedSet{Symbol} + # Data source for external concentrations (used in control) concentration_external::Vector{Dict{String, ScalarInterpolation}} = Dict{String, ScalarInterpolation}[] - substances::OrderedSet{Symbol} end """ @@ -469,12 +483,14 @@ end node_id: node ID of the LevelBoundary node active: whether this node is active level: the fixed level of this 'infinitely big basin' +concentration: matrix with boundary concentrations for each basin and substance +concentration_time: Data source for concentration updates """ @kwdef struct LevelBoundary{C} <: AbstractParameterNode node_id::Vector{NodeID} active::Vector{Bool} level::Vector{ScalarInterpolation} - concentration::Matrix{Float64} = Matrix{Float64}() + concentration::Matrix{Float64} concentration_time::StructVector{LevelBoundaryConcentrationV1, C, Int} end @@ -485,6 +501,8 @@ active: whether this node is active and thus contributes flow cumulative_flow: The exactly integrated cumulative boundary flow since the start of the simulation cumulative_flow_saveat: The exactly integrated cumulative boundary flow since the last saveat flow_rate: flow rate (exact) +concentration: matrix with boundary concentrations for each basin and substance +concentration_time: Data source for concentration updates """ @kwdef struct FlowBoundary{C} <: AbstractParameterNode node_id::Vector{NodeID} @@ -759,6 +777,8 @@ demand_from_timeseries: If false the demand comes from the BMI or is fixed allocated: water flux currently allocated to UserDemand per priority (node_idx, priority_idx) return_factor: the factor in [0,1] of how much of the abstracted water is given back to the system min_level: The level of the source basin below which the UserDemand does not abstract +concentration: matrix with boundary concentrations for each basin and substance +concentration_time: Data source for concentration updates """ @kwdef struct UserDemand{C} <: AbstractDemandNode node_id::Vector{NodeID} diff --git a/core/src/read.jl b/core/src/read.jl index d16778ac8..225c02182 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -430,8 +430,8 @@ function LevelBoundary(db::DB, config::Config)::LevelBoundary substances = get_substances(db, config) concentration = zeros(length(node_ids), length(substances)) - concentration[:, 1] .= 1.0 # Continuity - concentration[:, 3] .= 1.0 # UserDemand + concentration[:, Substance.Continuity] .= 1.0 + concentration[:, Substance.UserDemand] .= 1.0 set_concentrations!(concentration, concentration_time, substances, Int32.(node_ids)) if !valid @@ -473,8 +473,8 @@ function FlowBoundary(db::DB, config::Config, graph::MetaGraph)::FlowBoundary substances = get_substances(db, config) concentration = zeros(length(node_ids), length(substances)) - concentration[:, 1] .= 1.0 # Continuity - concentration[:, 4] .= 1.0 # UserDemand + concentration[:, Substance.Continuity] .= 1.0 + concentration[:, Substance.UserDemand] .= 1.0 set_concentrations!(concentration, concentration_time, substances, Int32.(node_ids)) if !valid @@ -598,16 +598,16 @@ function Basin(db::DB, config::Config, graph::MetaGraph)::Basin # TODO Move into a function substances = get_substances(db, config) concentration_state = zeros(n, length(substances)) - concentration_state[:, 1] .= 1.0 # Continuity - concentration_state[:, 2] .= 1.0 # Initial + concentration_state[:, Substance.Continuity] .= 1.0 + concentration_state[:, Substance.Initial] .= 1.0 set_concentrations!(concentration_state, concentration_state_data, substances, node_id) mass = copy(concentration_state) concentration = zeros(2, n, length(substances)) - concentration[1, :, 1] .= 1.0 # Drainage / Continuity - concentration[1, :, 6] .= 1.0 # Drainage / Drainage - concentration[2, :, 1] .= 1.0 # Precipitation / Continuity - concentration[2, :, 7] .= 1.0 # Precipitation / Precipitation + concentration[1, :, Substance.Continuity] .= 1.0 + concentration[1, :, Substance.Drainage] .= 1.0 + concentration[2, :, Substance.Continuity] .= 1.0 + concentration[2, :, Substance.Precipitation] .= 1.0 set_concentrations!( view(concentration, 1, :, :), concentration_time, @@ -706,7 +706,7 @@ function Basin(db::DB, config::Config, graph::MetaGraph)::Basin @assert length(storage0) == n "Basin / state length differs from number of Basins" basin.storage0 .= storage0 basin.storage_prev .= storage0 - basin.mass .*= storage0 # total mass + basin.mass .*= storage0 # was initialized by concentration_state, resulting in mass return basin end @@ -1132,8 +1132,8 @@ function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand substances = get_substances(db, config) concentration = zeros(length(node_ids), length(substances)) - concentration[:, 1] .= 1.0 # Continuity - concentration[:, 5] .= 1.0 # UserDemand + # Continuity concentration is zero, as the return flow (from a Basin) already includes it + concentration[:, Substance.UserDemand] .= 1.0 set_concentrations!(concentration, concentration_time, substances, ids) if errors || !valid_demand(node_ids, demand_itp, priorities) @@ -1506,15 +1506,7 @@ end "Determine all substances present in the input over multiple tables" function get_substances(db::DB, config::Config)::OrderedSet{Symbol} # Hardcoded tracers - substances = OrderedSet{Symbol}([ - :Continuity, - :Initial, - :LevelBoundary, - :FlowBoundary, - :UserDemand, - :Drainage, - :Precipitation, - ]) + substances = OrderedSet{Symbol}(Symbol.(instances(Substance.T))) for table in [ BasinConcentrationStateV1, BasinConcentrationV1, diff --git a/docs/concept/core.qmd b/docs/concept/core.qmd index 3103d5944..59970797d 100644 --- a/docs/concept/core.qmd +++ b/docs/concept/core.qmd @@ -119,3 +119,22 @@ end allocation_subNetwork->>user_demand: allocated user_demand->>basin: abstracted ``` + +# Substance (tracer) concentration calculations + +Ribasim can calculate concentrations of conservative tracers (i.e. substances that are non-reactive). +It does so by calculating the mass transports by flows for each timestep, in the `update_cumulative_flows!` callback. +Specifically, for each basin at each timestep we calculate: + +- all mass inflows (flow * source_concentration) given the edge inflows +- update the concentrations in the basin based on the added storage (previous storage + inflows) +- all mass outflows (flow * basin_concentration_state) give the edge outflows +- update the concentrations in the basin based on the current storage + +We thus keep track of both mass and concentration of substances for each basin. +Note that we have not added the substance mass to the states, and we assume that concentrations of flows are piecewise constant over a timestep. + +By default we calculate concentrations for the following source tracers. +- Continuity (mass balance, fraction of all water sources, sum of all other source tracers) +- Initial (fraction of initial storages) +- LevelBoundary, FlowBoundary, UserDemand, Drainage, Precipitation (fraction of different boundaries)