Skip to content

Commit

Permalink
Merge pull request #43 from PhilipFackler/proc_payload_respec
Browse files Browse the repository at this point in the history
Proc payload respec
  • Loading branch information
williamfgc authored Aug 12, 2022
2 parents da64a52 + 0207d00 commit ff32b31
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 93 deletions.
29 changes: 16 additions & 13 deletions src/core/Config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,26 @@ function default_config()
:data_streams => [
D(
:name => "Level_0",
:initial_size_range => [3000, 3600],
:evolution => D(
:function => "GrowthFactor",
:params => [1.0718],
),
:nprocs_ratio => 0.5,
:proc_payloads => [
D(:size_range => [1000, 1200], :ratio => 0.5),
D(:size_range => [2000, 2400], :ratio => 0.5),
:proc_payload_groups => [
D(:size_ratio => "1/3", :proc_ratio => 0.5),
D(:size_ratio => "2/3", :proc_ratio => 0.5),
],
),
D(
:name => "Level_1",
:initial_size_range => [6000, 7200],
:evolution => D(
:function => "GrowthFactor",
:params => [1.0414],
),
:proc_payloads => [
D(:size_range => [2000, 2500], :ratio => "1/4"),
D(:size_range => [4000, 4800], :ratio => "3/4"),
:proc_payload_groups => [
D(:size_ratio => "1/3", :proc_ratio => "1/4"),
D(:size_ratio => "2/3", :proc_ratio => "3/4"),
],
),
],
Expand All @@ -61,16 +62,18 @@ function default_config()
:data_streams => [
D(
:name => "Level_0",
:proc_payloads => [
D(:size_range => [1000, 1200], :ratio => 0.1),
D(:size_range => [2000, 2400], :ratio => 0.9),
:initial_size_range => [3000, 3600],
:proc_payload_groups => [
D(:size_ratio => "1/3", :proc_ratio => 0.1),
D(:size_ratio => "2/3", :proc_ratio => 0.9),
],
),
D(
:name => "Level_1",
:proc_payloads => [
D(:size_range => [2000, 2500], :ratio => "1/8"),
D(:size_range => [4000, 4800], :ratio => "7/8"),
:initial_size_range => [6000, 7300],
:proc_payload_groups => [
D(:size_ratio => "1/3", :proc_ratio => "1/8"),
D(:size_ratio => "2/3", :proc_ratio => "7/8"),
],
),
],
Expand Down
11 changes: 8 additions & 3 deletions src/core/Ctrl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ import RIOPA: DataGen, IO
function configure_stream(stream_cfg::Config)
evolve_func = get_evolution_function(get(stream_cfg, :evolution, nothing))
groups = map(
grp_cfg -> PayloadGroup(grp_cfg[:size_range], grp_cfg[:ratio]),
stream_cfg[:proc_payloads],
grp_cfg -> PayloadGroup(grp_cfg[:size_ratio], grp_cfg[:proc_ratio]),
stream_cfg[:proc_payload_groups],
)
return DataStreamConfig(
stream_cfg[:name],
stream_cfg[:initial_size_range],
evolve_func,
groups,
)
return DataStreamConfig(stream_cfg[:name], evolve_func, groups)
end

function configure_dataset(sub_cfg::Config)
Expand Down
112 changes: 88 additions & 24 deletions src/core/DataStream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import MPI
mutable struct PayloadRange
a::Int32
b::Int32
PayloadRange(a::Real, b::Real) = new(floor(a), ceil(b))
end

# PayloadRange(r::NTuple{2,<:Integer}) = PayloadRange(r[1], r[2])

function Base.copy(r::PayloadRange)
return PayloadRange(r.a, r.b)
end
Expand All @@ -20,23 +19,75 @@ function Base.:(==)(r::PayloadRange, t::NTuple{2,<:Integer})
return r.a == t[1] && r.b == t[2]
end

struct PayloadGroup
range::PayloadRange
ratio::Float64
function Base.:(*)(r::PayloadRange, x::Real)
return PayloadRange(r.a * x, r.b * x)
end

PayloadGroup(range::Vector{<:Integer}, ratio::T) where {T} =
PayloadGroup(PayloadRange(range[1], range[2]), get_ratio(ratio))
struct PayloadGroup
size_ratio::Float64
proc_ratio::Float64
PayloadGroup(size_ratio, proc_ratio) =
new(get_ratio(size_ratio), get_ratio(proc_ratio))
end

abstract type EvolutionFunction end

struct DataStreamConfig
name::String
initial_size_range::PayloadRange
evolve::EvolutionFunction
payload_groups::Vector{PayloadGroup}
end

function get_payload_group_id(
DataStreamConfig(
name::String,
range::Vector{<:Integer},
evolve::EvolutionFunction,
groups::Vector{PayloadGroup},
) = DataStreamConfig(name, PayloadRange(range[1], range[2]), evolve, groups)

struct ProcessGroupRatioError <: Exception
msg::String
end

function check_payload_group_ratios(
stream_cfg::DataStreamConfig,
dsname::AbstractString,
get_ratio,
)
sum = 0.0
for grp in stream_cfg.payload_groups
sum += get_ratio(grp)
end
if !isapprox(sum, 1.0)
throw(
ProcessGroupRatioError(
"Sum of ratios ($sum) must equal 1; dataset: " *
dsname *
", stream: " *
stream_cfg.name,
),
)
end
end

function check_payload_group_ratios(
stream_cfg::DataStreamConfig,
dsname::AbstractString,
)
check_payload_group_ratios(
stream_cfg,
dsname,
grp::PayloadGroup -> grp.size_ratio,
)
check_payload_group_ratios(
stream_cfg,
dsname,
grp::PayloadGroup -> grp.proc_ratio,
)
end

function get_payload_group_id_and_size(
rank::Integer,
nranks::Integer,
cfg::DataStreamConfig,
Expand All @@ -45,39 +96,50 @@ function get_payload_group_id(
current = 0.0
for id = 1:length(cfg.payload_groups)
grp = cfg.payload_groups[id]
current += grp.ratio
prev = current
current += grp.proc_ratio
if percentile <= current
return id
size = floor(Int32, current * nranks - floor(prev * nranks))
return id, size
end
end
# TODO: throw an error here ?
end

function get_payload_group_id(cfg::DataStreamConfig)
function get_payload_group_id_and_size(cfg::DataStreamConfig)
comm = MPI.COMM_WORLD
return get_payload_group_id(MPI.Comm_rank(comm), MPI.Comm_size(comm), cfg)
return get_payload_group_id_and_size(
MPI.Comm_rank(comm),
MPI.Comm_size(comm),
cfg,
)
end

abstract type DataObject end

mutable struct DataVector <: DataObject
vec::Vector{Float64}
vec::Vector{Int8}
end

DataVector() = DataVector(Float64[])
DataVector() = DataVector(Int8[])

mutable struct DataStream
initial_range::PayloadRange
size_ratio::Float64
range::PayloadRange
evolve::EvolutionFunction
data::DataObject
end

DataStream(range::PayloadRange, evolve::EvolutionFunction) =
DataStream(range, copy(range), evolve, DataVector())
DataStream(initrange::PayloadRange, ratio::Float64, evolve::EvolutionFunction) =
DataStream(initrange, ratio, initrange * ratio, evolve, DataVector())

DataStream(cfg::DataStreamConfig) =
DataStream(cfg.payload_groups[get_payload_group_id(cfg)].range, cfg.evolve)
function DataStream(cfg::DataStreamConfig)
group_id, group_size = get_payload_group_id_and_size(cfg)
group = cfg.payload_groups[group_id]
size_ratio = group.size_ratio / group_size
return DataStream(cfg.initial_size_range, size_ratio, cfg.evolve)
end

struct GrowthFactorEvFn <: EvolutionFunction
factor::Float64
Expand All @@ -91,8 +153,8 @@ function evolve_payload_range!(
fn::GrowthFactorEvFn,
)
growth = fn.factor^step
stream.range.a = round(stream.initial_range.a * growth)
stream.range.b = round(stream.initial_range.b * growth)
stream.range.a = round(stream.initial_range.a * growth * stream.size_ratio)
stream.range.b = round(stream.initial_range.b * growth * stream.size_ratio)
end

struct PolynomialEvFn <: EvolutionFunction
Expand All @@ -108,8 +170,10 @@ function evolve_payload_range!(
fn::PolynomialEvFn,
)
growth = fn.poly(step)
stream.range.a = stream.initial_range.a + growth
stream.range.b = stream.initial_range.b + growth
stream.range.a =
round((stream.initial_range.a + growth) * stream.size_ratio)
stream.range.b =
round((stream.initial_range.b + growth) * stream.size_ratio)
end

function evolve_payload_range!(stream::DataStream, step::Integer)
Expand All @@ -118,7 +182,7 @@ end

function check_length(expected::Integer, params::Vector{<:Real})
if length(params) != expected
@error "Wrong number of parameters"
error("Wrong number of parameters")
end
return params
end
Expand All @@ -129,7 +193,7 @@ function get_evolution_function(evcfg::Config)
"GrowthFactor" => return GrowthFactorEvFn(check_length(1, params))
"Polynomial" => return PolynomialEvFn(params)
"Linear" => return PolynomialEvFn(check_length(1, params))
_ => @error "Unsupported stream size evolution function"
_ => error("Unsupported stream size evolution function")
end
end

Expand Down
22 changes: 2 additions & 20 deletions src/core/datagen/DataGen.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module DataGen
import RIOPA: TagBase, DataStream, PayloadRange, DataSet
import OrderedCollections: LittleDict
# Functions
import RIOPA: evolve_payload_range!
import RIOPA: check_payload_group_ratios, evolve_payload_range!
# Modules
import MPI, Random

Expand Down Expand Up @@ -32,27 +32,9 @@ struct DefaultDataGenTag <: DataGenTag end

get_tag(::Nothing) = DefaultDataGenTag()

struct ProcessPayloadRatioError <: Exception
msg::String
end

function check_payload_group_ratios(ds::DataSet)
for stream_cfg in ds.cfg.streams
sum = 0.0
for grp in stream_cfg.payload_groups
sum += grp.ratio
end
if !isapprox(sum, 1.0)
throw(
ProcessPayloadRatioError(
"Sum of payload group ratios ($sum) must equal 1" *
"; dataset: " *
ds.cfg.name *
", stream: " *
stream_cfg.name,
),
)
end
check_payload_group_ratios(stream_cfg, ds.cfg.name)
end
end

Expand Down
2 changes: 1 addition & 1 deletion src/hello/hello.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ function hello(config::Config)
"HDF5" => hello_hdf5(data)
"ADIOS2" => hello_adios2(data)
"Julia" => write(basename * "_$worldrank.dat", data)
_ => @error "Unsupported I/O strategy"
_ => error("Unsupported I/O strategy")
end

MPI.Barrier(MPI.COMM_WORLD)
Expand Down
7 changes: 4 additions & 3 deletions test/unit/core/test_Config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import Test: @testset, @test, @test_throws
config = RIOPA.default_config()
dscfg = config[:datasets][1]
@test dscfg[:io_backend] == "HDF5"
grpcfg = dscfg[:data_streams][1][:proc_payloads][2]
@test grpcfg[:size_range] == [2000, 2400]
@test grpcfg[:ratio] == 0.5
@test dscfg[:data_streams][1][:initial_size_range] == [3000, 3600]
grpcfg = dscfg[:data_streams][1][:proc_payload_groups][2]
@test RIOPA.get_ratio(grpcfg[:size_ratio]) == 2.0/3.0
@test grpcfg[:proc_ratio] == 0.5
filename = "testcase-temp-config.yaml"
RIOPA.generate_config(filename)
@test ispath(filename)
Expand Down
27 changes: 15 additions & 12 deletions test/unit/core/test_Ctrl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import Test: @testset, @test, @test_throws
streamcfg = RIOPA.Ctrl.configure_stream(dscfg_raw[:data_streams][1])

@test streamcfg.name == "Level_0"
@test streamcfg.payload_groups[1].range == (1000, 1200)
@test streamcfg.payload_groups[1].ratio == 0.5
@test streamcfg.payload_groups[2].range == (2000, 2400)
@test streamcfg.payload_groups[2].ratio == 0.5
@test streamcfg.initial_size_range == (3000, 3600)
@test streamcfg.payload_groups[1].size_ratio == 1.0/3.0
@test streamcfg.payload_groups[1].proc_ratio == 0.5
@test streamcfg.payload_groups[2].size_ratio == 2.0/3.0
@test streamcfg.payload_groups[2].proc_ratio == 0.5

ds = RIOPA.Ctrl.configure_dataset(dscfg_raw)
@test ds.cfg.name == "data 1"
Expand All @@ -22,16 +23,18 @@ import Test: @testset, @test, @test_throws
@test length(ds.cfg.streams) == 2
streamcfg = ds.cfg.streams[1]
@test streamcfg.name == "Level_0"
@test streamcfg.payload_groups[1].range == (1000, 1200)
@test streamcfg.payload_groups[1].ratio == 0.5
@test streamcfg.payload_groups[2].range == (2000, 2400)
@test streamcfg.payload_groups[2].ratio == 0.5
@test streamcfg.initial_size_range == (3000, 3600)
@test streamcfg.payload_groups[1].size_ratio == 1.0/3.0
@test streamcfg.payload_groups[1].proc_ratio == 0.5
@test streamcfg.payload_groups[2].size_ratio == 2.0/3.0
@test streamcfg.payload_groups[2].proc_ratio == 0.5
streamcfg = ds.cfg.streams[2]
@test streamcfg.name == "Level_1"
@test streamcfg.payload_groups[1].range == (2000, 2500)
@test streamcfg.payload_groups[1].ratio == 1 // 4
@test streamcfg.payload_groups[2].range == (4000, 4800)
@test streamcfg.payload_groups[2].ratio == 3 // 4
@test streamcfg.initial_size_range == (6000, 7200)
@test streamcfg.payload_groups[1].size_ratio == 1.0/3.0
@test streamcfg.payload_groups[1].proc_ratio == 1 // 4
@test streamcfg.payload_groups[2].size_ratio == 2.0/3.0
@test streamcfg.payload_groups[2].proc_ratio == 3 // 4
end

mutable struct TestTag <: RIOPA.TagBase
Expand Down
Loading

0 comments on commit ff32b31

Please sign in to comment.