diff --git a/src/core/Config.jl b/src/core/Config.jl index 529779e..0a2fe4d 100644 --- a/src/core/Config.jl +++ b/src/core/Config.jl @@ -27,25 +27,26 @@ function default_config() :data_streams => [ D( :name => "Level_0", + :initial_size_range => [3000, 3600], :evolution => D( :function => "GrowthFactor", :params => [1.0718], ), - :nprocs_ratio => 0.5, - :proc_payloads => [ - D(:size_range => [1000, 1200], :ratio => 0.5), - D(:size_range => [2000, 2400], :ratio => 0.5), + :proc_payload_groups => [ + D(:size_ratio => "1/3", :proc_ratio => 0.5), + D(:size_ratio => "2/3", :proc_ratio => 0.5), ], ), D( :name => "Level_1", + :initial_size_range => [6000, 7200], :evolution => D( :function => "GrowthFactor", :params => [1.0414], ), - :proc_payloads => [ - D(:size_range => [2000, 2500], :ratio => "1/4"), - D(:size_range => [4000, 4800], :ratio => "3/4"), + :proc_payload_groups => [ + D(:size_ratio => "1/3", :proc_ratio => "1/4"), + D(:size_ratio => "2/3", :proc_ratio => "3/4"), ], ), ], @@ -61,16 +62,18 @@ function default_config() :data_streams => [ D( :name => "Level_0", - :proc_payloads => [ - D(:size_range => [1000, 1200], :ratio => 0.1), - D(:size_range => [2000, 2400], :ratio => 0.9), + :initial_size_range => [3000, 3600], + :proc_payload_groups => [ + D(:size_ratio => "1/3", :proc_ratio => 0.1), + D(:size_ratio => "2/3", :proc_ratio => 0.9), ], ), D( :name => "Level_1", - :proc_payloads => [ - D(:size_range => [2000, 2500], :ratio => "1/8"), - D(:size_range => [4000, 4800], :ratio => "7/8"), + :initial_size_range => [6000, 7300], + :proc_payload_groups => [ + D(:size_ratio => "1/3", :proc_ratio => "1/8"), + D(:size_ratio => "2/3", :proc_ratio => "7/8"), ], ), ], diff --git a/src/core/Ctrl.jl b/src/core/Ctrl.jl index a7df1ef..6f4efd7 100644 --- a/src/core/Ctrl.jl +++ b/src/core/Ctrl.jl @@ -10,10 +10,15 @@ import RIOPA: DataGen, IO function configure_stream(stream_cfg::Config) evolve_func = get_evolution_function(get(stream_cfg, :evolution, nothing)) groups = map( - grp_cfg -> PayloadGroup(grp_cfg[:size_range], grp_cfg[:ratio]), - stream_cfg[:proc_payloads], + grp_cfg -> PayloadGroup(grp_cfg[:size_ratio], grp_cfg[:proc_ratio]), + stream_cfg[:proc_payload_groups], + ) + return DataStreamConfig( + stream_cfg[:name], + stream_cfg[:initial_size_range], + evolve_func, + groups, ) - return DataStreamConfig(stream_cfg[:name], evolve_func, groups) end function configure_dataset(sub_cfg::Config) diff --git a/src/core/DataStream.jl b/src/core/DataStream.jl index 64ec850..8413d34 100644 --- a/src/core/DataStream.jl +++ b/src/core/DataStream.jl @@ -8,10 +8,9 @@ import MPI mutable struct PayloadRange a::Int32 b::Int32 + PayloadRange(a::Real, b::Real) = new(floor(a), ceil(b)) end -# PayloadRange(r::NTuple{2,<:Integer}) = PayloadRange(r[1], r[2]) - function Base.copy(r::PayloadRange) return PayloadRange(r.a, r.b) end @@ -20,23 +19,75 @@ function Base.:(==)(r::PayloadRange, t::NTuple{2,<:Integer}) return r.a == t[1] && r.b == t[2] end -struct PayloadGroup - range::PayloadRange - ratio::Float64 +function Base.:(*)(r::PayloadRange, x::Real) + return PayloadRange(r.a * x, r.b * x) end -PayloadGroup(range::Vector{<:Integer}, ratio::T) where {T} = - PayloadGroup(PayloadRange(range[1], range[2]), get_ratio(ratio)) +struct PayloadGroup + size_ratio::Float64 + proc_ratio::Float64 + PayloadGroup(size_ratio, proc_ratio) = + new(get_ratio(size_ratio), get_ratio(proc_ratio)) +end abstract type EvolutionFunction end struct DataStreamConfig name::String + initial_size_range::PayloadRange evolve::EvolutionFunction payload_groups::Vector{PayloadGroup} end -function get_payload_group_id( +DataStreamConfig( + name::String, + range::Vector{<:Integer}, + evolve::EvolutionFunction, + groups::Vector{PayloadGroup}, +) = DataStreamConfig(name, PayloadRange(range[1], range[2]), evolve, groups) + +struct ProcessGroupRatioError <: Exception + msg::String +end + +function check_payload_group_ratios( + stream_cfg::DataStreamConfig, + dsname::AbstractString, + get_ratio, +) + sum = 0.0 + for grp in stream_cfg.payload_groups + sum += get_ratio(grp) + end + if !isapprox(sum, 1.0) + throw( + ProcessGroupRatioError( + "Sum of ratios ($sum) must equal 1; dataset: " * + dsname * + ", stream: " * + stream_cfg.name, + ), + ) + end +end + +function check_payload_group_ratios( + stream_cfg::DataStreamConfig, + dsname::AbstractString, +) + check_payload_group_ratios( + stream_cfg, + dsname, + grp::PayloadGroup -> grp.size_ratio, + ) + check_payload_group_ratios( + stream_cfg, + dsname, + grp::PayloadGroup -> grp.proc_ratio, + ) +end + +function get_payload_group_id_and_size( rank::Integer, nranks::Integer, cfg::DataStreamConfig, @@ -45,39 +96,50 @@ function get_payload_group_id( current = 0.0 for id = 1:length(cfg.payload_groups) grp = cfg.payload_groups[id] - current += grp.ratio + prev = current + current += grp.proc_ratio if percentile <= current - return id + size = floor(Int32, current * nranks - floor(prev * nranks)) + return id, size end end # TODO: throw an error here ? end -function get_payload_group_id(cfg::DataStreamConfig) +function get_payload_group_id_and_size(cfg::DataStreamConfig) comm = MPI.COMM_WORLD - return get_payload_group_id(MPI.Comm_rank(comm), MPI.Comm_size(comm), cfg) + return get_payload_group_id_and_size( + MPI.Comm_rank(comm), + MPI.Comm_size(comm), + cfg, + ) end abstract type DataObject end mutable struct DataVector <: DataObject - vec::Vector{Float64} + vec::Vector{Int8} end -DataVector() = DataVector(Float64[]) +DataVector() = DataVector(Int8[]) mutable struct DataStream initial_range::PayloadRange + size_ratio::Float64 range::PayloadRange evolve::EvolutionFunction data::DataObject end -DataStream(range::PayloadRange, evolve::EvolutionFunction) = - DataStream(range, copy(range), evolve, DataVector()) +DataStream(initrange::PayloadRange, ratio::Float64, evolve::EvolutionFunction) = + DataStream(initrange, ratio, initrange * ratio, evolve, DataVector()) -DataStream(cfg::DataStreamConfig) = - DataStream(cfg.payload_groups[get_payload_group_id(cfg)].range, cfg.evolve) +function DataStream(cfg::DataStreamConfig) + group_id, group_size = get_payload_group_id_and_size(cfg) + group = cfg.payload_groups[group_id] + size_ratio = group.size_ratio / group_size + return DataStream(cfg.initial_size_range, size_ratio, cfg.evolve) +end struct GrowthFactorEvFn <: EvolutionFunction factor::Float64 @@ -91,8 +153,8 @@ function evolve_payload_range!( fn::GrowthFactorEvFn, ) growth = fn.factor^step - stream.range.a = round(stream.initial_range.a * growth) - stream.range.b = round(stream.initial_range.b * growth) + stream.range.a = round(stream.initial_range.a * growth * stream.size_ratio) + stream.range.b = round(stream.initial_range.b * growth * stream.size_ratio) end struct PolynomialEvFn <: EvolutionFunction @@ -108,8 +170,10 @@ function evolve_payload_range!( fn::PolynomialEvFn, ) growth = fn.poly(step) - stream.range.a = stream.initial_range.a + growth - stream.range.b = stream.initial_range.b + growth + stream.range.a = + round((stream.initial_range.a + growth) * stream.size_ratio) + stream.range.b = + round((stream.initial_range.b + growth) * stream.size_ratio) end function evolve_payload_range!(stream::DataStream, step::Integer) @@ -118,7 +182,7 @@ end function check_length(expected::Integer, params::Vector{<:Real}) if length(params) != expected - @error "Wrong number of parameters" + error("Wrong number of parameters") end return params end @@ -129,7 +193,7 @@ function get_evolution_function(evcfg::Config) "GrowthFactor" => return GrowthFactorEvFn(check_length(1, params)) "Polynomial" => return PolynomialEvFn(params) "Linear" => return PolynomialEvFn(check_length(1, params)) - _ => @error "Unsupported stream size evolution function" + _ => error("Unsupported stream size evolution function") end end diff --git a/src/core/datagen/DataGen.jl b/src/core/datagen/DataGen.jl index 6a1377a..60f91b8 100644 --- a/src/core/datagen/DataGen.jl +++ b/src/core/datagen/DataGen.jl @@ -4,7 +4,7 @@ module DataGen import RIOPA: TagBase, DataStream, PayloadRange, DataSet import OrderedCollections: LittleDict # Functions -import RIOPA: evolve_payload_range! +import RIOPA: check_payload_group_ratios, evolve_payload_range! # Modules import MPI, Random @@ -32,27 +32,9 @@ struct DefaultDataGenTag <: DataGenTag end get_tag(::Nothing) = DefaultDataGenTag() -struct ProcessPayloadRatioError <: Exception - msg::String -end - function check_payload_group_ratios(ds::DataSet) for stream_cfg in ds.cfg.streams - sum = 0.0 - for grp in stream_cfg.payload_groups - sum += grp.ratio - end - if !isapprox(sum, 1.0) - throw( - ProcessPayloadRatioError( - "Sum of payload group ratios ($sum) must equal 1" * - "; dataset: " * - ds.cfg.name * - ", stream: " * - stream_cfg.name, - ), - ) - end + check_payload_group_ratios(stream_cfg, ds.cfg.name) end end diff --git a/src/hello/hello.jl b/src/hello/hello.jl index 4cfddeb..a1eac3d 100644 --- a/src/hello/hello.jl +++ b/src/hello/hello.jl @@ -13,7 +13,7 @@ function hello(config::Config) "HDF5" => hello_hdf5(data) "ADIOS2" => hello_adios2(data) "Julia" => write(basename * "_$worldrank.dat", data) - _ => @error "Unsupported I/O strategy" + _ => error("Unsupported I/O strategy") end MPI.Barrier(MPI.COMM_WORLD) diff --git a/test/unit/core/test_Config.jl b/test/unit/core/test_Config.jl index 6d45207..210d626 100644 --- a/test/unit/core/test_Config.jl +++ b/test/unit/core/test_Config.jl @@ -5,9 +5,10 @@ import Test: @testset, @test, @test_throws config = RIOPA.default_config() dscfg = config[:datasets][1] @test dscfg[:io_backend] == "HDF5" - grpcfg = dscfg[:data_streams][1][:proc_payloads][2] - @test grpcfg[:size_range] == [2000, 2400] - @test grpcfg[:ratio] == 0.5 + @test dscfg[:data_streams][1][:initial_size_range] == [3000, 3600] + grpcfg = dscfg[:data_streams][1][:proc_payload_groups][2] + @test RIOPA.get_ratio(grpcfg[:size_ratio]) == 2.0/3.0 + @test grpcfg[:proc_ratio] == 0.5 filename = "testcase-temp-config.yaml" RIOPA.generate_config(filename) @test ispath(filename) diff --git a/test/unit/core/test_Ctrl.jl b/test/unit/core/test_Ctrl.jl index e0ecad4..e613e73 100644 --- a/test/unit/core/test_Ctrl.jl +++ b/test/unit/core/test_Ctrl.jl @@ -7,10 +7,11 @@ import Test: @testset, @test, @test_throws streamcfg = RIOPA.Ctrl.configure_stream(dscfg_raw[:data_streams][1]) @test streamcfg.name == "Level_0" - @test streamcfg.payload_groups[1].range == (1000, 1200) - @test streamcfg.payload_groups[1].ratio == 0.5 - @test streamcfg.payload_groups[2].range == (2000, 2400) - @test streamcfg.payload_groups[2].ratio == 0.5 + @test streamcfg.initial_size_range == (3000, 3600) + @test streamcfg.payload_groups[1].size_ratio == 1.0/3.0 + @test streamcfg.payload_groups[1].proc_ratio == 0.5 + @test streamcfg.payload_groups[2].size_ratio == 2.0/3.0 + @test streamcfg.payload_groups[2].proc_ratio == 0.5 ds = RIOPA.Ctrl.configure_dataset(dscfg_raw) @test ds.cfg.name == "data 1" @@ -22,16 +23,18 @@ import Test: @testset, @test, @test_throws @test length(ds.cfg.streams) == 2 streamcfg = ds.cfg.streams[1] @test streamcfg.name == "Level_0" - @test streamcfg.payload_groups[1].range == (1000, 1200) - @test streamcfg.payload_groups[1].ratio == 0.5 - @test streamcfg.payload_groups[2].range == (2000, 2400) - @test streamcfg.payload_groups[2].ratio == 0.5 + @test streamcfg.initial_size_range == (3000, 3600) + @test streamcfg.payload_groups[1].size_ratio == 1.0/3.0 + @test streamcfg.payload_groups[1].proc_ratio == 0.5 + @test streamcfg.payload_groups[2].size_ratio == 2.0/3.0 + @test streamcfg.payload_groups[2].proc_ratio == 0.5 streamcfg = ds.cfg.streams[2] @test streamcfg.name == "Level_1" - @test streamcfg.payload_groups[1].range == (2000, 2500) - @test streamcfg.payload_groups[1].ratio == 1 // 4 - @test streamcfg.payload_groups[2].range == (4000, 4800) - @test streamcfg.payload_groups[2].ratio == 3 // 4 + @test streamcfg.initial_size_range == (6000, 7200) + @test streamcfg.payload_groups[1].size_ratio == 1.0/3.0 + @test streamcfg.payload_groups[1].proc_ratio == 1 // 4 + @test streamcfg.payload_groups[2].size_ratio == 2.0/3.0 + @test streamcfg.payload_groups[2].proc_ratio == 3 // 4 end mutable struct TestTag <: RIOPA.TagBase diff --git a/test/unit/core/test_DataGen.jl b/test/unit/core/test_DataGen.jl index 68635e5..e1ceee9 100644 --- a/test/unit/core/test_DataGen.jl +++ b/test/unit/core/test_DataGen.jl @@ -13,23 +13,23 @@ struct TestDataGenTag <: RIOPA.DataGen.DataGenTag end config = RIOPA.default_config() ds = Ctrl.configure_dataset(config[:datasets][1]) str_cfg = ds.cfg.streams[1] - @test RIOPA.get_payload_group_id(0, 4, str_cfg) == 1 - @test RIOPA.get_payload_group_id(1, 4, str_cfg) == 1 - @test RIOPA.get_payload_group_id(2, 4, str_cfg) == 2 - @test RIOPA.get_payload_group_id(3, 4, str_cfg) == 2 + @test RIOPA.get_payload_group_id_and_size(0, 4, str_cfg) == (1, 2) + @test RIOPA.get_payload_group_id_and_size(1, 4, str_cfg) == (1, 2) + @test RIOPA.get_payload_group_id_and_size(2, 4, str_cfg) == (2, 2) + @test RIOPA.get_payload_group_id_and_size(3, 4, str_cfg) == (2, 2) - @test RIOPA.get_payload_group_id(0, 4, ds.cfg.streams[2]) == 1 - @test RIOPA.get_payload_group_id(1, 4, ds.cfg.streams[2]) == 2 - @test RIOPA.get_payload_group_id(2, 4, ds.cfg.streams[2]) == 2 - @test RIOPA.get_payload_group_id(3, 4, ds.cfg.streams[2]) == 2 + @test RIOPA.get_payload_group_id_and_size(0, 4, ds.cfg.streams[2]) == (1, 1) + @test RIOPA.get_payload_group_id_and_size(1, 4, ds.cfg.streams[2]) == (2, 3) + @test RIOPA.get_payload_group_id_and_size(2, 4, ds.cfg.streams[2]) == (2, 3) + @test RIOPA.get_payload_group_id_and_size(3, 4, ds.cfg.streams[2]) == (2, 3) - @test RIOPA.get_payload_group_id(0, 5, ds.cfg.streams[1]) == 1 - @test RIOPA.get_payload_group_id(1, 5, ds.cfg.streams[1]) == 1 - @test RIOPA.get_payload_group_id(2, 5, ds.cfg.streams[1]) == 2 - @test RIOPA.get_payload_group_id(3, 5, ds.cfg.streams[1]) == 2 - @test RIOPA.get_payload_group_id(4, 5, ds.cfg.streams[1]) == 2 + @test RIOPA.get_payload_group_id_and_size(0, 5, ds.cfg.streams[1]) == (1, 2) + @test RIOPA.get_payload_group_id_and_size(1, 5, ds.cfg.streams[1]) == (1, 2) + @test RIOPA.get_payload_group_id_and_size(2, 5, ds.cfg.streams[1]) == (2, 3) + @test RIOPA.get_payload_group_id_and_size(3, 5, ds.cfg.streams[1]) == (2, 3) + @test RIOPA.get_payload_group_id_and_size(4, 5, ds.cfg.streams[1]) == (2, 3) - @test RIOPA.get_payload_group_id(0, 1, str_cfg) == 2 + @test RIOPA.get_payload_group_id_and_size(0, 1, str_cfg) == (2, 1) DataGen.generate!(DataGen.DefaultDataGenTag(), ds) @test 2000 <= length(ds.streams[1].data.vec) <= 2400 @@ -39,8 +39,9 @@ struct TestDataGenTag <: RIOPA.DataGen.DataGenTag end @test 4000 <= length(ds.streams[1].data.vec) <= 4800 @test 6000 <= length(ds.streams[2].data.vec) <= 7200 - config[:datasets][1][:data_streams][1][:proc_payloads][1][:ratio] = 1.0 - @test_throws DataGen.ProcessPayloadRatioError Ctrl.configure_dataset( + config[:datasets][1][:data_streams][1][:proc_payload_groups][1][:proc_ratio] = + 1.0 + @test_throws RIOPA.ProcessGroupRatioError Ctrl.configure_dataset( config[:datasets][1], ) end diff --git a/test/unit/core/test_DataStream.jl b/test/unit/core/test_DataStream.jl index cb79164..db113f7 100644 --- a/test/unit/core/test_DataStream.jl +++ b/test/unit/core/test_DataStream.jl @@ -4,6 +4,7 @@ import Test: @testset, @test, @test_throws @testset "evolution" begin stream = RIOPA.DataStream( RIOPA.PayloadRange(10, 20), + 1.0, RIOPA.GrowthFactorEvFn(1.5), ) RIOPA.evolve_payload_range!(stream, 1) @@ -18,6 +19,7 @@ import Test: @testset, @test, @test_throws stream = RIOPA.DataStream( RIOPA.PayloadRange(10, 20), + 1.0, RIOPA.PolynomialEvFn([0, 1]), ) RIOPA.evolve_payload_range!(stream, 1) diff --git a/test/unit/core/test_IO.jl b/test/unit/core/test_IO.jl index 305d06b..aa8a809 100644 --- a/test/unit/core/test_IO.jl +++ b/test/unit/core/test_IO.jl @@ -11,7 +11,7 @@ struct TestIOTag <: RIOPA.IO.IOTag end end @testset "IO Backends" begin - data = RIOPA.DataVector(rand(Float64, 10)) + data = RIOPA.DataVector(rand(Int8, 80)) @test RIOPA.IO.get_tag("HDF5") == RIOPA.HDF5IOTag() @test RIOPA.IO.get_tag("hdf5") == RIOPA.HDF5IOTag() RIOPA.IO.write_data_object(RIOPA.HDF5IOTag(), pwd(), data)