From 2ec5ce94e8190f90337c76bed74d826c85a58036 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 23 Nov 2015 20:04:25 -0500 Subject: [PATCH] new ValGens + riak_dt+pb driver(s) - Add a fixed size binary set generator (from rdb's bigset work) For testing CRDT sets we often want to generate a set of elements of elements of a certain size. In the past we used int generators, but sets of 32bit elements aren't realistic enough. We want to know how many elements of size X before performance degrades. To that end this commit adds a generator `{fixed_bin_set, ElemSize, Cardinality}` where `ElemSize` is the size of a random binary element and `Cardinality` is the size of the set. It makes use of the existing binary block generation code in valgen. (cherry picked from commit 3471aaff4da29db4486008436d92fbe681ef0b9f) - Add poisson_bin valgen w/ workaround to approx poisson via normal dist when over a threshold size (cause it could be too slow!) - Add variable size binary set generator to create bins within (minsize/maxsize, poisson dist, exp dist) params over a specified cardinality - basho_bench driver for riak datatypes over pb, focused on sets right now for current benchmark needs (and comparisons to bigsets) - driver allows for benches over preloaded sets, a single set, and normal-stylings (creating sets via keygen) and contains calls for batch_inserts via rdb's work - new distributions/valgens for bins and gen'ing/using them w/ var_bin_sets - cleaner examples - add maps benchies & examples - add more sets bench operations & examples --- examples/riak_dt_maps.config | 21 + examples/riak_dt_maps_single.config | 18 + examples/riak_dt_sets.config | 19 + examples/riak_dt_sets_aws.config | 21 + examples/riak_dt_sets_batch.config | 21 + examples/riak_dt_sets_big_elements.config | 16 + examples/riak_dt_sets_preload.config | 22 + examples/riak_dt_sets_read.config | 23 + examples/riak_dt_sets_single.config | 20 + examples/riakclient.config | 6 +- examples/riakclient_sets.config | 22 + priv/riak_kv_bench_clientsets.patch | 26 + src/basho_bench_driver_riakc_dt_pb.erl | 717 +++++++++++++++++++++ src/basho_bench_driver_riakclient_sets.erl | 374 +++++++++++ src/basho_bench_keygen.erl | 60 +- src/basho_bench_riak_dt_util.erl | 117 ++++ src/basho_bench_valgen.erl | 137 +++- 17 files changed, 1625 insertions(+), 15 deletions(-) create mode 100644 examples/riak_dt_maps.config create mode 100644 examples/riak_dt_maps_single.config create mode 100644 examples/riak_dt_sets.config create mode 100644 examples/riak_dt_sets_aws.config create mode 100644 examples/riak_dt_sets_batch.config create mode 100644 examples/riak_dt_sets_big_elements.config create mode 100644 examples/riak_dt_sets_preload.config create mode 100644 examples/riak_dt_sets_read.config create mode 100644 examples/riak_dt_sets_single.config create mode 100644 examples/riakclient_sets.config create mode 100644 priv/riak_kv_bench_clientsets.patch create mode 100644 src/basho_bench_driver_riakc_dt_pb.erl create mode 100644 src/basho_bench_driver_riakclient_sets.erl create mode 100644 src/basho_bench_riak_dt_util.erl diff --git a/examples/riak_dt_maps.config b/examples/riak_dt_maps.config new file mode 100644 index 000000000..1fe7179a6 --- /dev/null +++ b/examples/riak_dt_maps.config @@ -0,0 +1,21 @@ +{mode, {rate, max}}. +{duration, 5}. +{concurrent, 10}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, maps. +{riakc_dt_bucket, {<<"maps">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. +{value_generator, {var_bin_set, 1000, 100, poisson}}. + +{operations, [{{map, counter, insert_no_ctx}, 20}, {{map, register, modify}, 2}, + {{map, read}, 5}, {{map, map, insert}, 1}, + {{map, flag, modify}, 1}, {{map, multi_ops, insert_no_ctx}, 1}]}. + +{riakc_dt_map_multi_ops, [map_counter, map_register, map_flag, map_set, + map_map]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_maps_single.config b/examples/riak_dt_maps_single.config new file mode 100644 index 000000000..0eadb7394 --- /dev/null +++ b/examples/riak_dt_maps_single.config @@ -0,0 +1,18 @@ +{mode, {rate, max}}. +{duration, 2}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, maps. +{riakc_dt_bucket, {<<"maps">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. +{value_generator, {fixed_bin, 10000}}. + +{operations, [{{map, map, modify}, 1}]}. + +{riakc_dt_run_one_map, true}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets.config b/examples/riak_dt_sets.config new file mode 100644 index 000000000..10f787b9e --- /dev/null +++ b/examples/riak_dt_sets.config @@ -0,0 +1,19 @@ +{mode, {rate, max}}. +{duration, 2}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. +%% {value_generator, {keygen, {int_to_bin_bigendian, {sequential_int, 10000}}}}. +{value_generator, {var_bin_set, 10, 100, 1000}}. + +%% 50 writes to 1 read +{operations, [{{set, insert}, 50}, {{set, read}, 1}, {{set, is_element}, 1}, + {{set, remove}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_aws.config b/examples/riak_dt_sets_aws.config new file mode 100644 index 000000000..6ea5511bc --- /dev/null +++ b/examples/riak_dt_sets_aws.config @@ -0,0 +1,21 @@ +{mode, {rate, max}}. +{duration, 1}. +{concurrent, 1}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 1000}}}. +{value_generator, {keygen, {int_to_bin_bigendian, {sequential_int, 10000}}}}. + +{operations, [{{set, insert}, 1}]}. + +{riakc_dt_ips, + [{"riak101.aws", 8087}, + {"riak102.aws", 8087}, + {"riak103.aws", 8087}, + {"riak104.aws", 8087}, + {"riak105.aws", 8087}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_batch.config b/examples/riak_dt_sets_batch.config new file mode 100644 index 000000000..2f0121e8b --- /dev/null +++ b/examples/riak_dt_sets_batch.config @@ -0,0 +1,21 @@ +{mode, {rate, max}}. +{duration, 3}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. + +%% 1000 per batch, means 10 sets w/ 1000 items that are 100 (4 + 96) bytes each +{value_generator, {keygen, {concat_binary, {valgen, {bin_seed, 96}}, + {int_to_bin_bigendian, {sequential_int, 1000000}}}}}. + +{riakc_dt_sets_batchsize, 1000}. + +{operations, [{{set, batch_insert}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_big_elements.config b/examples/riak_dt_sets_big_elements.config new file mode 100644 index 000000000..f6393d418 --- /dev/null +++ b/examples/riak_dt_sets_big_elements.config @@ -0,0 +1,16 @@ +{mode, {rate, max}}. +{duration, 1}. +{concurrent, 1}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {sequential_int, 100}}}. +{value_generator, {fixed_bin_set, 1000, 200}}. + +{operations, [{{set, insert}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_preload.config b/examples/riak_dt_sets_preload.config new file mode 100644 index 000000000..9f4bf719a --- /dev/null +++ b/examples/riak_dt_sets_preload.config @@ -0,0 +1,22 @@ +{mode, {rate, max}}. +{duration, 5}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {uniform_int, 100}}. + +%% 1000 elements, exponential binary +{value_generator, {exponential_bin, 100, 500}}. + +{riakc_dt_preload_sets, true}. +{riakc_dt_preload_sets_num, 10}. +{riakc_dt_max_vals_for_preload, 1000}. + +{operations, [{{set, insert}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_read.config b/examples/riak_dt_sets_read.config new file mode 100644 index 000000000..76b861f39 --- /dev/null +++ b/examples/riak_dt_sets_read.config @@ -0,0 +1,23 @@ +{mode, {rate, max}}. +{duration, 2}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. +{value_generator, {keygen, {int_to_bin_bigendian, {sequential_int, 100}}}}. + +%% For preloaded_sets: +%% {riakc_dt_preload_sets, true}. +%% {riakc_dt_preload_sets_num, 10}. + +%% For run once: +%% {riakc_dt_run_one_set, true}. + +{operations, [{{set, read}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riak_dt_sets_single.config b/examples/riak_dt_sets_single.config new file mode 100644 index 000000000..e1d520523 --- /dev/null +++ b/examples/riak_dt_sets_single.config @@ -0,0 +1,20 @@ +{mode, {rate, max}}. +{duration, 3}. +{concurrent, 5}. +{report_interval, 1}. + +%% This bucket type must be created and set to be datatype, sets. +{riakc_dt_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {uniform_int, 10}}}. + +%% MinSize=10, MaxSize=100, 1000 items +{value_generator, {var_bin_set, 10, 100, 1000}}. + +{riakc_dt_run_one_set, true}. + +{operations, [{{set, insert_no_ctx}, 1}]}. + +{riakc_dt_ips, [{{127,0,0,1}, [10017, 10027, 10037, 10047, 10057]}]}. + +{driver, basho_bench_driver_riakc_dt_pb}. diff --git a/examples/riakclient.config b/examples/riakclient.config index c97dd9f30..9cf583388 100644 --- a/examples/riakclient.config +++ b/examples/riakclient.config @@ -6,14 +6,14 @@ {driver, basho_bench_driver_riakclient}. -{code_paths, ["/Users/jmeredith/basho/riak/apps/riak_kv", - "/Users/jmeredith/basho/riak/apps/riak_core"]}. +{code_paths, ["/Users/VINCESTAPLES/riak_ee/deps/riak_kv", + "/Users/VINCESTAPLES/riak_ee/deps/riak_core"]}. {key_generator, {int_to_bin_bigendian, {uniform_int, 35000}}}. {value_generator, {fixed_bin, 10000}}. -{riakclient_nodes, ['riak@127.0.0.1']}. +{riakclient_nodes, ['dev1@127.0.0.1']}. {riakclient_mynode, ['riak_bench@127.0.0.1', longnames]}. diff --git a/examples/riakclient_sets.config b/examples/riakclient_sets.config new file mode 100644 index 000000000..178bc24db --- /dev/null +++ b/examples/riakclient_sets.config @@ -0,0 +1,22 @@ +{mode, {rate, max}}. +{duration, 2}. +{concurrent, 5}. +{report_interval, 1}. +{mode, max}. + +{riakclient_bucket, {<<"sets">>, <<"test">>}}. + +{key_generator, {int_to_bin_bigendian, {pareto_int, 100}}}. +{value_generator, {keygen, {int_to_bin_bigendian, {sequential_int, 100000}}}}. + +{code_paths, ["/Users/VINCESTAPLES/riak_ee/deps/riak_kv/ebin", + "/Users/VINCESTAPLES/riak_ee/deps/riak_dt/ebin", + "/Users/VINCESTAPLES/riak_ee/deps/riak_core/ebin"]}. + +{riakclient_mynode, ['dev1_bench@127.0.0.1', longnames]}. +{riakclient_nodes, ['dev1@127.0.0.1', 'dev2@127.0.0.1', 'dev3@127.0.0.1', + 'dev4@127.0.0.1', 'dev5@127.0.0.1']}. + +{operations, [{insert, 10}, {read, 3}, {remove_last_read, 1}]}. + +{driver, basho_bench_driver_riakclient_sets}. diff --git a/priv/riak_kv_bench_clientsets.patch b/priv/riak_kv_bench_clientsets.patch new file mode 100644 index 000000000..a9772e3a5 --- /dev/null +++ b/priv/riak_kv_bench_clientsets.patch @@ -0,0 +1,26 @@ +diff --git a/src/riak_kv_crdt.erl b/src/riak_kv_crdt.erl +index 01ac9a0..769e799 100644 +--- a/src/riak_kv_crdt.erl ++++ b/src/riak_kv_crdt.erl +@@ -29,7 +29,8 @@ + %% MR helper funs + -export([value/1, counter_value/1, set_value/1, map_value/1]). + %% Other helper funs +--export([is_crdt/1]). ++-export([is_crdt/1, operation/3]). ++-compile([export_all]). + + -include("riak_kv_wm_raw.hrl"). + -include("riak_object.hrl"). +@@ -510,6 +511,11 @@ get_context(Type, Value) -> + false -> <<>> + end. + ++%% @TODO remove, added for hack up benchmarking sets ++operation(Mod, Op, Ctx) -> ++ #crdt_op{mod=Mod, op=Op, ctx=Ctx}. ++ ++ + %% =================================================================== + %% EUnit tests + %% =================================================================== diff --git a/src/basho_bench_driver_riakc_dt_pb.erl b/src/basho_bench_driver_riakc_dt_pb.erl new file mode 100644 index 000000000..b04a7fd1b --- /dev/null +++ b/src/basho_bench_driver_riakc_dt_pb.erl @@ -0,0 +1,717 @@ +% ------------------------------------------------------------------- +%% +%% basho_bench_driver_riakc_dt_pb: Driver for riak protocol buffers client wrt +%% to riak datatypes, specifically sets & maps +%% +%% Copyright (c) 2016 Basho Techonologies +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(basho_bench_driver_riakc_dt_pb). + +-export([new/1, + run/4]). + +%% Special exports for fun apply for multi-map ops +-export([map_counter/2, + map_flag/2, + map_register/2, + map_set/2, + map_map/2]). + +-include("basho_bench.hrl"). + +-record(state, { pid, + bucket, + last_key=undefined, + batch_size, + set_val_gen_name=undefined, + preload, + preloaded_sets, + preloaded_sets_num, + var_bin_size, + last_preload_nth, + max_vals_for_preload, + run_one_set, + run_one_map, + map_multi_ops + }). + +-define(DEFAULT_SET_KEY, <<"bench_set">>). +-define(DEFAULT_MAP_KEY, <<"bench_map">>). +-define(DEFAULT_MAP_OPS, [map_counter, map_register, map_flag, map_set, + map_map]). + +new(Id) -> + case code:which(riakc_pb_socket) of + non_existing -> + ?FAIL_MSG("~s requires riakc_pb_socket module to be available" ++ + "on code path.\n", [?MODULE]); + _ -> + ok + end, + + Ips = basho_bench_config:get(riakc_dt_ips, [{127,0,0,1}]), + Port = basho_bench_config:get(riakc_dt_port, 8087), + Bucket = basho_bench_config:get(riakc_dt_bucket, {<<"riak_dt">>, + <<"test">>}), + BatchSize = basho_bench_config:get(riakc_dt_sets_batchsize, 100), + SetValGenName = basho_bench_config:get(riakc_dt_preload_valgen_name, + undefined), + Preload = basho_bench_config:get(riakc_dt_preload_sets, false), + PreloadNum = basho_bench_config:get(riakc_dt_preload_sets_num, 10), + MaxValsForPreloadSet = basho_bench_config:get( + riakc_dt_max_vals_for_preload, 100), + RunOneSet = basho_bench_config:get(riakc_dt_run_one_set, false), + RunOneMap = basho_bench_config:get(riakc_dt_run_one_map, false), + + MapMultiOpsL = basho_bench_config:get(riakc_dt_map_multi_ops, []), + MapMultiOps = case MapMultiOpsL of + [] -> []; + _ -> filter_map_multi_ops(MapMultiOpsL, ?DEFAULT_MAP_OPS) + + end, + + ValueGenTup = basho_bench_config:get(value_generator), + StartBinSize = basho_bench_riak_dt_util:set_start_bin_size(ValueGenTup), + + PreloadedSets = if Preload -> + [begin X1 = integer_to_binary(X), + Y = <<"set">>, + <> + end || X <- lists:seq(1, PreloadNum)]; + true -> [] + end, + + %% Choose the target node using our ID as a modulus + Targets = basho_bench_config:normalize_ips(Ips, Port), + lager:info("Ips: ~p Targets: ~p\n", [Ips, Targets]), + {TargetIp, TargetPort} = lists:nth((Id rem length(Targets)+1), Targets), + ?INFO("Using target ~p:~p for worker ~p\n", [TargetIp, TargetPort, Id]), + case riakc_pb_socket:start_link( + TargetIp, TargetPort, get_connect_options()) of + {ok, Pid} -> + case {Preload, RunOneSet, RunOneMap} of + {true, _, _} -> + preload_sets(Pid, PreloadedSets, Bucket, + StartBinSize); + {_, true, _} -> + run_one_set(Pid, Bucket, StartBinSize); + {_, _, true} -> + run_one_map(Pid, Bucket); + {_, _, _} -> + lager:info("No special pre-operations specified.") + end, + {ok, #state{ pid = Pid, + bucket = Bucket, + batch_size = BatchSize, + set_val_gen_name=SetValGenName, + preload = Preload, + preloaded_sets = PreloadedSets, + preloaded_sets_num = PreloadNum, + last_preload_nth = 0, + max_vals_for_preload = MaxValsForPreloadSet, + run_one_set = RunOneSet, + run_one_map = RunOneMap, + map_multi_ops = MapMultiOps + }}; + {error, Reason} -> + ?FAIL_MSG("Failed to connect riakc_pb_socket to ~p:~p: ~p\n", + [TargetIp, TargetPort, Reason]) + end. + +%%%=================================================================== +%%% Sets +%%%===================================================================\ + +run({set, insert_no_ctx}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_set=true}=State) -> + Val = ValueGen(), + Set0 = riakc_set:new(), + Set1 = riakc_set:add_element(Val, Set0), + update_type(Pid, Bucket, ?DEFAULT_SET_KEY, {set, Set1}, State); +run({set, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({set, insert}, KeyGen, ValueGen, State); + +run({set, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_set=true}=State) -> + case riakc_pb_socket:fetch_type(Pid, Bucket, ?DEFAULT_SET_KEY) of + {ok, Set0} -> + Val = ValueGen(), + Set1 = riakc_set:add_element(Val, Set0), + update_type(Pid, Bucket, ?DEFAULT_SET_KEY, {set, Set1}, State); + {error, {notfound, _}} -> + {ok, State}; + {error, FetchReason} -> + {error, FetchReason, State} + end; +run({set, insert}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, preload=true, preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum, last_preload_nth=LastNth, + max_vals_for_preload=MaxValsForPreloadSet}=State) -> + NextNth = case LastNth >= PreloadedSetsNum of + true -> 1; + false -> LastNth + 1 + end, + SetKey = lists:nth(NextNth, PreloadedSets), + case riakc_pb_socket:fetch_type(Pid, Bucket, SetKey) of + {ok, Set0} -> + Val = ValueGen(), + SetSize = riakc_set:size(Set0), + if SetSize < MaxValsForPreloadSet -> + Set1 = riakc_set:add_element(Val, Set0), + update_type(Pid, Bucket, SetKey, {set, Set1}, State, + State#state{last_preload_nth=NextNth}); + true -> {ok, State#state{last_preload_nth=NextNth}} + end; + {error, {notfound, _}} -> + {ok, State}; + {error, FetchReason} -> + {error, {FetchReason, SetKey}, State} + end; +run({set, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + SetKey = KeyGen(), + Val = ValueGen(), + Set0 = riakc_set:new(), + Set1 = riakc_set:add_element(Val, Set0), + update_type(Pid, Bucket, SetKey, {set, Set1}, State); + +run({set, modify}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + SetKey = KeyGen(), + Val = ValueGen(), + SetFun = fun(S) -> riakc_set:add_element(Val, S) end, + modify_type(Pid, Bucket, SetKey, SetFun, State); + +%% Note: Make sure to not use sequential for keys when run, unless +%% supplying a specific SetValGenName. +run({set, batch_insert}, KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, last_key=LastKey, batch_size=BatchSize, + set_val_gen_name=SetValGenName}=State) -> + {SetKey, Members} = basho_bench_riak_dt_util:gen_set_batch(KeyGen, + ValueGen, + LastKey, + BatchSize, + SetValGenName), + Set0 = riakc_set:new(), + SetLast = lists:foldl(fun(Elem, Sets) -> + Sets ++ [riakc_set:add_element(Elem, Set0)] + end, [], Members), + State2 = State#state{last_key=SetKey}, + update_type(Pid, Bucket, SetKey, {set, lists:last(SetLast)}, State2); + +run({set, read}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_set=true}=State) -> + fetch_action(Pid, Bucket, ?DEFAULT_SET_KEY, ValueGen, State, read); +run({set, read}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, preload=true, preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum}=State) -> + SetKey = lists:nth(random:uniform(PreloadedSetsNum), PreloadedSets), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, read); +run({set, read}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + SetKey = KeyGen(), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, read); + +run({set, remove}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_set=true}=State) -> + fetch_action(Pid, Bucket, ?DEFAULT_SET_KEY, ValueGen, State, remove_element); +run({set, remove}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, preload=true, preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum}=State) -> + SetKey = lists:nth(random:uniform(PreloadedSetsNum), PreloadedSets), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, remove_element); +run({set, remove}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + SetKey = KeyGen(), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, remove_element); + +run({set, is_element}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_set=true}=State) -> + fetch_action(Pid, Bucket, ?DEFAULT_SET_KEY, ValueGen, State, is_element); +run({set, is_element}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, preload=true, preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum}=State) -> + SetKey = lists:nth(random:uniform(PreloadedSetsNum), PreloadedSets), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, is_element); +run({set, is_element}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + SetKey = KeyGen(), + fetch_action(Pid, Bucket, SetKey, ValueGen, State, is_element); + +%%%=================================================================== +%%% Maps +%%%=================================================================== + +run({map, set, insert_no_ctx}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + Map = riakc_map:new(), + MapWSet = map_set(Map, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWSet}, State); + +run({map, set, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, set, insert}, KeyGen, ValueGen, State); + +run({map, set, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWSet = map_set(FetchedMap, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWSet}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, set, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + Map = riakc_map:new(), + MapWSet = map_set(Map, Val), + update_type(Pid, Bucket, MapKey, {map, MapWSet}, State); + +run({map, set, modify}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + MapFun = fun(M) -> map_set(M, Val) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, set, remove}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWSet = map_set_remove(FetchedMap, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWSet}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, set, remove}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + MapFun = fun(M) -> map_set_remove(M, Val) end, + modify_type(Pid, Bucket, MapKey, MapFun, State, [{create, false}]); + +run({map, register, insert_no_ctx}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + Map = riakc_map:new(), + MapWReg = map_register(Map, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWReg}, State); + +run({map, register, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, register, insert}, KeyGen, ValueGen, State); + +run({map, register, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWReg = map_register(FetchedMap, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWReg}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, register, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + Map = riakc_map:new(), + MapWReg = map_register(Map, Val), + update_type(Pid, Bucket, MapKey, {map, MapWReg}, State); + +run({map, register, modify}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + MapFun = fun(M) -> map_register(M, Val) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, flag, insert_no_ctx}, _KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Map = riakc_map:new(), + MapWFlag = map_flag(Map), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWFlag}, State); + +run({map, flag, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, flag, insert}, KeyGen, ValueGen, State); + +run({map, flag, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWFlag = map_flag(FetchedMap), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWFlag}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, flag, insert}, KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Map = riakc_map:new(), + MapWFlag = map_flag(Map), + update_type(Pid, Bucket, MapKey, {map, MapWFlag}, State); + +run({map, flag, modify}, KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + MapFun = fun(M) -> map_flag(M) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, counter, insert_no_ctx}, _KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Map = riakc_map:new(), + MapWCnt = map_counter(Map), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWCnt}, State); + +run({map, counter, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, counter, insert}, KeyGen, ValueGen, State); + +run({map, counter, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWCnt = map_counter(FetchedMap), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWCnt}, State); + _ -> + FetchedMap %% return ok or error state from fetch call + end; + +run({map, counter, insert}, KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Map = riakc_map:new(), + MapWCnt = map_counter(Map), + update_type(Pid, Bucket, MapKey, {map, MapWCnt}, State); + +run({map, counter, modify}, KeyGen, _ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + MapFun = fun(M) -> map_counter(M) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, map, insert_no_ctx}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + Map = riakc_map:new(), + MapWMap = map_map(Map, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWMap}, State); + +run({map, map, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, map, insert}, KeyGen, ValueGen, State); + +run({map, map, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + Val = ValueGen(), + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWMap = map_map(FetchedMap, Val), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWMap}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, map, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + Map = riakc_map:new(), + MapWMap = map_map(Map, Val), + update_type(Pid, Bucket, MapKey, {map, MapWMap}, State); + +run({map, map, modify}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + MapFun = fun(M) -> map_map(M, Val) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, multi_ops, insert_no_ctx}, KeyGen, ValueGen, #state{map_multi_ops=[]}=State) -> + run({map, multi_ops, insert}, KeyGen, ValueGen, State); + +run({map, multi_ops, insert_no_ctx}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, run_one_map=true, map_multi_ops=Ops}=State) -> + Val = ValueGen(), + Map = riakc_map:new(), + MapWOps = map_multi(Map, Val, Ops), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWOps}, State); + +run({map, multi_ops, insert_no_ctx}, KeyGen, ValueGen, State) -> + run({map, multi_ops, insert}, KeyGen, ValueGen, State); + +run({map, multi_ops, insert}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + map_multi_ops=[], + run_one_map=true}=State) -> + %% If No Ops, treat as read + fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, read); + +run({map, multi_ops, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + map_multi_ops=[]}=State) -> + %% If No Ops, treat as read + MapKey = KeyGen(), + fetch_action(Pid, Bucket, MapKey, ValueGen, State, read); + +run({map, multi_ops, insert}, _KeyGen, ValueGen, + #state{pid=Pid, bucket=Bucket, + map_multi_ops=Ops, + run_one_map=true}=State) -> + Val = ValueGen(), + FetchedMap = fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, + return_dt), + case element(1, FetchedMap) of + map -> + MapWOps = map_multi(FetchedMap, Val, Ops), + update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, MapWOps}, State); + _ -> FetchedMap %% return ok or error state from fetch call + end; + +run({map, multi_ops, insert}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + map_multi_ops=Ops}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + Map = riakc_map:new(), + MapWOps = map_multi(Map, Val, Ops), + update_type(Pid, Bucket, MapKey, {map, MapWOps}, State); + +run({map, multi_ops, modify}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + map_multi_ops=Ops}=State) -> + MapKey = KeyGen(), + Val = ValueGen(), + MapFun = fun(M) -> map_multi(M, Val, Ops) end, + modify_type(Pid, Bucket, MapKey, MapFun, State); + +run({map, read}, _KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket, + run_one_map=true}=State) -> + fetch_action(Pid, Bucket, ?DEFAULT_MAP_KEY, ValueGen, State, read); + +run({map, read}, KeyGen, ValueGen, #state{pid=Pid, bucket=Bucket}=State) -> + MapKey = KeyGen(), + fetch_action(Pid, Bucket, MapKey, ValueGen, State, read). + + +%%%=================================================================== +%%% Private +%%%=================================================================== + +get_connect_options() -> + basho_bench_config:get(pb_connect_options, [{auto_reconnect, true}]). + +%% @private preload and update riak with an N-number of set keys, +%% named in range <<"1..Nset">>. +preload_sets(Pid, PreloadKeys, Bucket, StartBinSize) -> + [begin + case riakc_pb_socket:fetch_type(Pid, Bucket, SetKey) of + {error, _} -> + Set0 = riakc_set:new(), + Set1 = riakc_set:add_element(crypto:rand_bytes(StartBinSize), + Set0), + case update_type(Pid, Bucket, SetKey, {set, Set1}) of + {ok, _} -> + ?INFO("~p created", [SetKey]), + SetKey; + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [SetKey, Reason]), + error + end; + _ -> + ?INFO("~p Already Loaded", [SetKey]), + SetKey + end + end || SetKey <- PreloadKeys]. + +%% @private check by fetching single-set run key before creating a new one. +run_one_set(Pid, Bucket, StartBinSize) -> + case riakc_pb_socket:fetch_type(Pid, Bucket, ?DEFAULT_SET_KEY) of + {error, _} -> + Set0 = riakc_set:new(), + + %% can't be unmodified + Set1 = riakc_set:add_element(crypto:rand_bytes(StartBinSize), Set0), + case update_type(Pid, Bucket, ?DEFAULT_SET_KEY, {set, Set1}) of + {ok, _} -> + ?INFO("~p created", [?DEFAULT_SET_KEY]); + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [?DEFAULT_SET_KEY, Reason]) + end; + _ -> + ?INFO("~p Already Loaded", [?DEFAULT_SET_KEY]) + end. + +%% @private check by fetching single-map run key before creating a new one. +run_one_map(Pid, Bucket) -> + case riakc_pb_socket:fetch_type(Pid, Bucket, ?DEFAULT_MAP_KEY) of + {error, _} -> + Map0 = riakc_map:new(), + + %% can't be unmodified + Map1 = map_counter(Map0), + case update_type(Pid, Bucket, ?DEFAULT_MAP_KEY, {map, Map1}) of + {ok, _} -> + ?INFO("~p created", [?DEFAULT_MAP_KEY]); + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [?DEFAULT_MAP_KEY, Reason]) + end; + _ -> + ?INFO("~p Already Loaded", [?DEFAULT_MAP_KEY]) + end. + +%% @private fetch helper for read, remove, is_element runs +fetch_action(Pid, Bucket, Key, ValueGen, State, Action) -> + case riakc_pb_socket:fetch_type(Pid, Bucket, Key) of + {ok, DT0} -> + %% If wanting to check if elem is a member of the set first + %% - M = riakc_map:value(M0), + %% - Members = proplists:get_value({<<"members">>, set}, M, []), + %% - case {Action, length(Members) > 0} of ... + case Action of + remove_element -> + Val = ValueGen(), + DT1 = riakc_set:del_element(Val, DT0), + update_type(Pid, Bucket, Key, {set, DT1}, State); + is_element -> + Val = ValueGen(), + riakc_set:is_element(Val, DT0), + {ok, State}; + return_dt -> + DT0; + _ -> + {ok, State} + end; + {error, {notfound, _}} -> + {ok, State}; + {error, Reason} -> + {error, {Reason, Key}, State} + end. + +%% @private riac_pb_socket:modify_type wrapper +modify_type(Pid, Bucket, _Key, Fun, #state{run_one_set=true}=State) -> + modify_type(Pid, Bucket, ?DEFAULT_SET_KEY, Fun, State, State); +modify_type(Pid, Bucket, _Key, Fun, #state{run_one_map=true}=State) -> + modify_type(Pid, Bucket, ?DEFAULT_MAP_KEY, Fun, State, State); +modify_type(Pid, Bucket, Key, Fun, State) -> + modify_type(Pid, Bucket, Key, Fun, State, State). + +modify_type(Pid, Bucket, Key, Fun, State, Options) when is_list(Options) -> + modify_type(Pid, Bucket, Key, Fun, State, State, Options); +modify_type(Pid, Bucket, Key, Fun, State, State) -> + modify_type(Pid, Bucket, Key, Fun, State, State, [{create, true}]). + +modify_type(Pid, Bucket, Key, Fun, StateOnError, StateOnUpdate, Options) -> + case riakc_pb_socket:modify_type(Pid, Fun, Bucket, Key, Options) of + ok -> + {ok, StateOnUpdate}; + {ok, _} -> + {ok, StateOnUpdate}; + {ok, _, _} -> + {ok, StateOnUpdate}; + {error, {notfound, _}} -> + {ok, StateOnError}; + {error, Reason} -> + {error, {Reason, Key, modify_type}, StateOnError} + end. + +%% @private riac_pb_socket:update_type wrapper +update_type(Pid, Bucket, Key, ToOp) -> + update_type(Pid, Bucket, Key, ToOp, no_state). +update_type(Pid, Bucket, Key, ToOp, State) -> + update_type(Pid, Bucket, Key, ToOp, State, State). +update_type(Pid, Bucket, Key, ToOp, StateOnError, StateOnUpdate) -> + case riakc_pb_socket:update_type(Pid, Bucket, Key, to_op(ToOp)) of + ok -> + {ok, StateOnUpdate}; + {ok, _} -> + {ok, StateOnUpdate}; + {ok, _, _} -> + {ok, StateOnUpdate}; + {error, Reason} -> + {error, {Reason, Key, update_type}, StateOnError} + end. + +%% @private Case-specific for Op +to_op({set, ToOp}) -> + riakc_set:to_op(ToOp); +to_op({map, ToOp}) -> + riakc_map:to_op(ToOp). + +%% @private Helpers for map operations + +map_counter(Map, _) -> + map_counter(Map). +map_counter(Map) -> + riakc_map:update( + {<<"69">>, counter}, + fun(C) -> + riakc_counter:increment(C) + end, Map). + +map_flag(Map, _) -> + map_flag(Map). +map_flag(Map) -> + riakc_map:update( + {<<"familymattersandfullhouses">>, flag}, + fun(F) -> + riakc_flag:enable(F) + end, Map). + +map_register(Map, Val) -> + riakc_map:update( + {<<"soup">>, register}, + fun(R) -> + riakc_register:set(Val, R) + end, Map). + +map_set(Map, Val) -> + riakc_map:update( + {<<"justbirdtome">>, set}, + fun(S) -> + riakc_set:add_element(Val, S) + end, Map). + +map_set_remove(Map, Val) -> + riakc_map:update( + {<<"justbirdtome">>, set}, + fun(S) -> + riakc_set:del_element(Val, S) + end, Map). + +map_map(Map, Val) -> + riakc_map:update( + {<<"bad hobbits die hard">>, map}, + %% start by adding a register to the nested map + fun(M) -> + map_register(M, Val) + end, Map). + +%% @doc Update top-level map w/ through-a/with-a list of operations before +%% sending the final map (of updates) to the server +map_multi(Map, Val, Ops) -> + lists:foldl(fun(F, AccM) -> + apply(basho_bench_driver_riakc_dt_pb, F, [AccM, Val]) + end, Map, Ops). + +filter_map_multi_ops(Ops, PossibleOps) -> + OpsS = sets:from_list(lists:filter(fun(Elem) -> lists:member(Elem, PossibleOps) + end, Ops)), + case length(Ops) == sets:size(OpsS) of + true -> + lager:info("Using all map operations ~p in a single update", [Ops]), + Ops; + false -> + UList = sets:to_list(OpsS), + lager:info("Using only unique/working map operations ~p in a single update", + [UList]), + UList + end. diff --git a/src/basho_bench_driver_riakclient_sets.erl b/src/basho_bench_driver_riakclient_sets.erl new file mode 100644 index 000000000..866c2d089 --- /dev/null +++ b/src/basho_bench_driver_riakclient_sets.erl @@ -0,0 +1,374 @@ +%% ------------------------------------------------------------------- +%% +%% basho_bench: Benchmarking suite for riak datatype sets using the +%% localclient. +%% +%% Copyright (c) 2009-2015 Basho Techonologies +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +%% @doc copy of the basho_bench_driver_riakclient. Uses the internal +%% node client for dt sets (for comparision to basho_bench_driver_bigset) +%% @end + +-module(basho_bench_driver_riakclient_sets). + +-export([new/1, + run/4]). + +-include("basho_bench.hrl"). + +-define(DEFAULT_SET_KEY, <<"bench_set">>). +-define(SET_TYPE, riak_dt_orswot). + +-record(state, { client, + bucket, + last_key=undefined, + set_val_gen_name = undefined, + remove_set, %% The set name to perform a remove on + remove_ctx, %% The context of a get from `remove_set' + remove_value, %% a value from a get to `remove_set' + batch_size, %% in batch inserts, how many at once + preload, + preloaded_sets, + preloaded_sets_num, + var_bin_size, + last_preload_nth, + max_vals_for_preload, + run_one_set + }). + +%% ==================================================================== +%% API +%% ==================================================================== + +new(Id) -> + %% Make sure the path is setup such that we can get at riak_client + case code:which(riak_client) of + non_existing -> + ?FAIL_MSG("~s requires riak_client module to be available on" ++ + " code path.\n", [?MODULE]); + _ -> + ok + end, + + Nodes = basho_bench_config:get(riakclient_nodes), + Cookie = basho_bench_config:get(riak_cookie, 'riak'), + MyNode = basho_bench_config:get(riakclient_mynode, [basho_bench, longnames]), + Bucket = basho_bench_config:get(riakclient_bucket, {<<"sets">>, <<"test">>}), + BatchSize = basho_bench_config:get(riakclient_sets_batchsize, 1000), + SetValGenName = basho_bench_config:get(riakclient_sets_valgen_name, undefined), + Preload = basho_bench_config:get(riakclient_preload_sets, false), + PreloadNum = basho_bench_config:get(riakclient_preload_sets_num, 10), + MaxValsForPreloadSet = basho_bench_config:get( + riakclient_max_vals_for_preload, 100), + RunOneSet = basho_bench_config:get(riakclient_run_one_set, false), + + ValueGenTup = basho_bench_config:get(value_generator), + StartBinSize = basho_bench_riak_dt_util:set_start_bin_size(ValueGenTup), + + PreloadedSets = if Preload -> + [begin X1 = integer_to_binary(X), + Y = <<"set">>, + <> + end || X <- lists:seq(1, PreloadNum)]; + true -> [] + end, + + %% Try to spin up net_kernel + case net_kernel:start(MyNode) of + {ok, _} -> + ?INFO("Net kernel started as ~p\n", [node()]); + {error, {already_started, _}} -> + ok; + {error, Reason} -> + ?FAIL_MSG("Failed to start net_kernel for ~p: ~p\n", + [?MODULE, Reason]) + end, + + %% Initialize cookie for each of the nodes + [true = erlang:set_cookie(N, Cookie) || N <- Nodes], + + %% Try to ping each of the nodes + ping_each(Nodes), + + %% Choose the node using our ID as a modulus + TargetNode = lists:nth((Id rem length(Nodes)+1), Nodes), + ?INFO("Using target node ~p for worker ~p\n", [TargetNode, Id]), + + case riak:client_connect(TargetNode) of + {error, Reason2} -> + ?FAIL_MSG("Failed get a riak_client to ~p: ~p\n", + [TargetNode, Reason2]); + {ok, Client} -> + case {Preload, RunOneSet} of + {true, _} -> + preload_sets(Client, PreloadedSets, Bucket, + StartBinSize); + {_, true} -> + run_one_set(Client, Bucket, StartBinSize); + {_, _} -> + lager:info("No special pre-operations specified.") + end, + {ok, #state {client=Client, + bucket=Bucket, + batch_size=BatchSize, + set_val_gen_name=SetValGenName, + preload = Preload, + preloaded_sets = PreloadedSets, + preloaded_sets_num = PreloadNum, + last_preload_nth = 0, + max_vals_for_preload = MaxValsForPreloadSet, + run_one_set = RunOneSet + }} + end. + +run(read, _KeyGen, _ValueGen, #state{run_one_set=true}=State) -> + read_set(?DEFAULT_SET_KEY, State); +run(read, _KeyGen, _ValueGen, + #state{preload=true, + preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum}=State) -> + Key = lists:nth(random:uniform(PreloadedSetsNum), PreloadedSets), + read_set(Key, State); +run(read, KeyGen, _ValueGen, State) -> + Key = KeyGen(), + read_set(Key, State); + +run(insert_no_ctx, _KeyGen, ValueGen, #state{run_one_set=true}=State) -> + Member = ValueGen(), + add_element(?DEFAULT_SET_KEY, Member, State); +run(insert_no_ctx, KeyGen, ValueGen, State) -> + run(insert, KeyGen, ValueGen, State); + +run(insert, _KeyGen, ValueGen, #state{run_one_set=true, + client=C, bucket=B}=State) -> + case C:get(B, ?DEFAULT_SET_KEY, []) of + {ok, Res} -> + Member = ValueGen(), + {{Ctx, _Values}, _Stats} = riak_kv_crdt:value(Res, ?SET_TYPE), + add_element(?DEFAULT_SET_KEY, Member, Ctx, State); + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end; +run(insert, _KeyGen, ValueGen, + #state{client=C, bucket=B, preload=true, + preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum, + last_preload_nth=LastNth, + max_vals_for_preload=MaxValsForPreloadSet}=State) -> + NextNth = case LastNth >= PreloadedSetsNum of + true -> 1; + false -> LastNth + 1 + end, + Key = lists:nth(NextNth, PreloadedSets), + case C:get(B, Key, []) of + {ok, Res} -> + Member = ValueGen(), + {{Ctx, Values}, _Stats} = riak_kv_crdt:value(Res, ?SET_TYPE), + SetSize = length(Values), + if SetSize < MaxValsForPreloadSet -> + add_element(Key, Member, Ctx, State); + true -> {ok, State#state{last_preload_nth=NextNth}} + end; + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, {Reason, Key}, State} + end; +run(insert, KeyGen, ValueGen, State) -> + Member = ValueGen(), + Set = KeyGen(), + add_element(Set, Member, State); + +run(insert_pl, KeyGen, ValueGen, State) -> + #state{last_key=LastKey0} = State, + {Member, LastKey} = try + {ValueGen(), LastKey0} + catch + throw:{stop, empty_keygen} -> + ?DEBUG("Empty keygen, reset~n", []), + basho_bench_keygen:reset_sequential_int_state(), + {ValueGen(), undefined} + end, + + Set = case LastKey of + undefined -> + Key = KeyGen(), + ?DEBUG("New set ~p~n", [Key]), + Key; + Bin when is_binary(Bin) -> + Bin + end, + State2 = State#state{last_key=Set}, + add_element(Set, Member, State2); + +run(batch_insert, KeyGen, ValueGen, State) -> + #state{client=C, bucket=B, batch_size=BatchSize, + last_key=LastKey, + set_val_gen_name=SVGN} = State, + {Set, Members} = basho_bench_riak_dt_util:gen_set_batch(KeyGen, + ValueGen, + LastKey, + BatchSize, + SVGN), + State2 = State#state{last_key=Set}, + + O = riak_kv_crdt:new(B, Set, ?SET_TYPE), + Opp = riak_kv_crdt:operation(?SET_TYPE, {add_all, Members}, undefined), + Options1 = [{crdt_op, Opp}], + case C:put(O, Options1) of + ok -> + {ok, State2}; + {error, Reason} -> + {error, Reason, State2} + end; + +run(remove, _KeyGen, ValueGen, #state{run_one_set=true}=State) -> + Member= ValueGen(), + remove_element(?DEFAULT_SET_KEY, Member, State); +run(remove, _KeyGen, ValueGen, + #state{preload=true, + preloaded_sets=PreloadedSets, + preloaded_sets_num=PreloadedSetsNum}=State) -> + Key = lists:nth(random:uniform(PreloadedSetsNum), PreloadedSets), + Member = ValueGen(), + remove_element(Key, Member, State); +run(remove, KeyGen, ValueGen, State) -> + Key = KeyGen(), + Member = ValueGen(), + remove_element(Key, Member, State); + +run(remove_last_read, KeyGen, ValueGen, #state{remove_set=undefined}=State) -> + Key = KeyGen(), + Member = ValueGen(), + remove_element(Key, ValueGen, State); +run(remove_last_read, _KeyGen, _ValueGen, State) -> + #state{remove_set=Key, remove_ctx=Ctx, + remove_value=RemoveVal}=State, + remove_element(Key, RemoveVal, Ctx, State). + +%% ==================================================================== +%% Internal functions +%% ==================================================================== + +%% @private preload and update riak with an N-number of set keys, +%% named in range <<"1..Nset">>. +preload_sets(C, PreloadKeys, Bucket, StartBinSize) -> + [begin + case C:get(Bucket, ?DEFAULT_SET_KEY, []) of + {ok, _} -> + ?INFO("~p Already Loaded", [SetKey]), + SetKey; + {error, notfound} -> + case add_element(SetKey, + crypto:rand_bytes(StartBinSize), + #state{client=C, bucket=Bucket}) of + {ok, _} -> + ?INFO("~p created", [SetKey]), + SetKey; + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [SetKey, + Reason]), + error + end; + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [SetKey, Reason]), + SetKey + end + end || SetKey <- PreloadKeys]. + +run_one_set(C, Bucket, StartBinSize) -> + case C:get(Bucket, ?DEFAULT_SET_KEY, []) of + {ok, _} -> + ?INFO("~p Already Loaded", [?DEFAULT_SET_KEY]); + {error, notfound} -> + case add_element(?DEFAULT_SET_KEY, crypto:rand_bytes(StartBinSize), + #state{client=C, bucket=Bucket}) of + {ok, _} -> + ?INFO("~p created", [?DEFAULT_SET_KEY]); + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [?DEFAULT_SET_KEY, Reason]) + end; + {error, Reason} -> + ?ERROR("~p not created b/c ~p", [?DEFAULT_SET_KEY, Reason]) + end. + +ping_each([]) -> + ok; +ping_each([Node | Rest]) -> + case net_adm:ping(Node) of + pong -> + ping_each(Rest); + pang -> + ?FAIL_MSG("Failed to ping node ~p\n", [Node]) + end. + +add_element(Key, Val, State) -> + add_element(Key, Val, undefined, State). +add_element(Key, Val, Ctx, #state{client=C, bucket=B}=State) -> + O = riak_kv_crdt:new(B, Key, ?SET_TYPE), + Opp = riak_kv_crdt:operation(?SET_TYPE, {add, Val}, Ctx), + Options1 = [{crdt_op, Opp}], + case C:put(O, Options1) of + ok -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end. + +remove_element(Key, Val, #state{client=C, bucket=B}=State) -> + %% Force getting a context for remove. + case C:get(B, Key, []) of + {ok, Res} -> + {{Ctx, _Values}, _Stats} = riak_kv_crdt:value(Res, ?SET_TYPE), + remove_element(Key, Val, Ctx, State); + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end. +remove_element(Key, Val, undefined, State) -> + remove_element(Key, Val, State); +remove_element(Key, Val, Ctx, #state{client=C, bucket=B}=State) -> + O = riak_kv_crdt:new(B, Key, ?SET_TYPE), + Opp = riak_kv_crdt:operation(?SET_TYPE, {remove, Val}, Ctx), + Options1 = [{crdt_op, Opp}], + case C:put(O, Options1) of + ok -> + {ok, State}; + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end. + +read_set(Key, #state{client=C, bucket=B}=State) -> + case C:get(B, Key, []) of + {ok, Res} -> + {{Ctx, Values}, _Stats} = riak_kv_crdt:value(Res, ?SET_TYPE), + RemoveVal = basho_bench_riak_dt_util:random_element(Values), + %% Store the latest Ctx/State for a remove + {ok, State#state{remove_set=Key, + remove_ctx=Ctx, + remove_value=RemoveVal}}; + {error, notfound} -> + {ok, State}; + {error, Reason} -> + {error, Reason, State} + end. diff --git a/src/basho_bench_keygen.erl b/src/basho_bench_keygen.erl index 943705437..d3c72856d 100644 --- a/src/basho_bench_keygen.erl +++ b/src/basho_bench_keygen.erl @@ -24,7 +24,9 @@ -export([new/2, dimension/1, sequential_int_generator/4]). --export([reset_sequential_int_state/0]). % Internal driver use only. +% Internal driver use only. +-export([reset_sequential_int_state/0, + reset_sequential_int_state/1]). -include("basho_bench.hrl"). @@ -89,6 +91,23 @@ new({sequential_int, MaxKey}, Id) DisableProgress = basho_bench_config:get(disable_sequential_int_progress_report, false), fun() -> sequential_int_generator(Ref, MaxKey, Id, DisableProgress) end; +new({named_sequential_int, Name, MaxKey}, Id) + when is_integer(MaxKey), MaxKey > 0 -> + ?WARN("Are you sure that you want to use 'sequential_int'?\n" + "For most use cases, 'partitioned_sequential_int' is the better choice.\n", []), + DisableProgress = + basho_bench_config:get(disable_sequential_int_progress_report, false), + fun() -> sequential_int_generator(Name, MaxKey, Id, DisableProgress) end; +new({sequential_int, StartKey, MaxKey}, Id) + when is_integer(StartKey), StartKey > 0, + is_integer(MaxKey), MaxKey > 0, + MaxKey > StartKey -> + ?WARN("Are you sure that you want to use 'sequential_int'?\n" + "For most use cases, 'partitioned_sequential_int' is the better choice.\n", []), + Ref = make_ref(), + DisableProgress = + basho_bench_config:get(disable_sequential_int_progress_report, false), + fun() -> sequential_int_generator(Ref, MaxKey-StartKey, Id, DisableProgress) + StartKey end; new({partitioned_sequential_int, MaxKey}, Id) -> new({partitioned_sequential_int, 0, MaxKey}, Id); new({partitioned_sequential_int, StartKey, NumKeys}, Id) @@ -104,6 +123,18 @@ new({partitioned_sequential_int, StartKey, NumKeys}, Id) basho_bench_config:get(disable_sequential_int_progress_report, false), ?DEBUG("ID ~p generating range ~p to ~p\n", [Id, MinValue, MaxValue]), fun() -> sequential_int_generator(Ref, MaxValue - MinValue, Id, DisableProgress) + MinValue end; +new({named_partitioned_sequential_int, Name, StartKey, NumKeys}, Id) + when is_integer(StartKey), is_integer(NumKeys), NumKeys > 0 -> + Workers = basho_bench_config:get(concurrent), + Range = NumKeys div Workers, + MinValue = StartKey + Range * (Id - 1), + MaxValue = StartKey + + % Last worker picks up remainder to include entire range + case Workers == Id of true-> NumKeys; false -> Range * Id end, + DisableProgress = + basho_bench_config:get(disable_sequential_int_progress_report, false), + ?DEBUG("ID ~p generating range ~p to ~p\n", [Id, MinValue, MaxValue]), + fun() -> sequential_int_generator(Name, MaxValue - MinValue, Id, DisableProgress) + MinValue end; new({uniform_int, MaxKey}, _Id) when is_integer(MaxKey), MaxKey > 0 -> fun() -> random:uniform(MaxKey) end; @@ -178,7 +209,7 @@ new({file_line_bin, Path, DoRepeat}, Id) -> Bin end end; -%% Adapt a value generator. The function keygen would work if Id was added as +%% Adapt a value generator. The function keygen would work if Id was added as %% the last parameter. But, alas, it is added as the first. new({valgen, ValGen}, Id) -> basho_bench_valgen:new(ValGen, Id); @@ -331,10 +362,29 @@ seq_gen_state_dir(Id) -> end. reset_sequential_int_state() -> - case [X || {{sigen, X}, _} <- element(2, process_info(self(), - dictionary))] of - [Ref] -> + case [{X, N} || {{sigen, X}, N} <- element(2, process_info(self(), + dictionary))] of + [{Ref, _Val}] -> erlang:put({sigen, Ref}, 0); + [{Ref1, Val1}, {Ref2, Val2}] -> + %% HORRID HACK for two*sigen (reset the higher!) + if Val1 > Val2 -> + ?DEBUG("Resetting val gen for ~p ~p~n", [self(), Ref1]), + erlang:put({sigen, Ref1}, 0); + true -> + ?DEBUG("Resetting val gen for ~p ~p~n", [self(), Ref1]), + erlang:put({sigen, Ref2}, 0) + end; [] -> ok end. + +reset_sequential_int_state(Name) -> + Sigens = [{X, N} || {{sigen, X}, N} <- element(2, process_info(self(), + dictionary))], + case proplists:get_value(Name, Sigens) of + undefined -> ok; + _ -> + ?DEBUG("Resetting val gen for ~p ~p~n", [self(), Name]), + erlang:put({sigen, Name}, 0) + end. diff --git a/src/basho_bench_riak_dt_util.erl b/src/basho_bench_riak_dt_util.erl new file mode 100644 index 000000000..e19e38f69 --- /dev/null +++ b/src/basho_bench_riak_dt_util.erl @@ -0,0 +1,117 @@ +% ------------------------------------------------------------------- +%% +%% basho_bench_riak_dt_util: Utilities functions for riak_dt related +%% bench-runs. +%% +%% Copyright (c) 2016 Basho Techonologies +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(basho_bench_riak_dt_util). + +-export([gen_set_batch/5, + random_element/1, + set_start_bin_size/1]). + +-include("basho_bench.hrl"). + +%%%=================================================================== +%%% Public Util Funs +%%%=================================================================== + +random_element([]) -> + undefined; +random_element([E]) -> + E; +random_element(Vals) -> + Nth = crypto:rand_uniform(1, length(Vals)), + lists:nth(Nth, Vals). + +%% @doc starter binary size for set-preload. +set_start_bin_size(VGTup) -> + StartBinSize = + case VGTup of + {var_bin_set, Lambda, _, poisson} -> + basho_bench_valgen:poisson(Lambda); + {var_bin_set, Lambda, LambdaThresh, _, poisson} -> + basho_bench_valgen:poisson(Lambda, LambdaThresh); + {var_bin_set, Min, Mean, _, exponential} -> + basho_bench_valgen:exponential(Min, Mean); + {var_bin_set, Min, Max, _} -> + crypto:rand_uniform(Min, Max+1); + {uniform_bin, Min, Max} -> + crypto:rand_uniform(Min, Max+1); + {exponential_bin, Min, Mean} -> + basho_bench_valgen:exponential(Min, Mean); + {fixed_bin_set, Size, _} -> + Size; + {_, Min, Max} when is_integer(Min), is_integer(Max) -> + crypto:rand_uniform(Min, Max+1); + {_, Size} when is_integer(Size) -> + Size; + _ -> 4 + end, + ?DEBUG("StartBinSize: ~p\n", [StartBinSize]), + StartBinSize. + +%% @doc generate a tuple w/ a set-key and a batch of members from the valgen +%% NOTE: to be used w/ non-sequential `key_generator` keygen, otherwise +%% the reset for the exausted valgen will reset the key_generator +gen_set_batch(KeyGen, ValueGen, LastKey, BatchSize, SetValGenName) -> + case {LastKey, gen_members(BatchSize, ValueGen)} of + {_, []} -> + %% Exhausted value gen, new key + Key = KeyGen(), + ?DEBUG("New set ~p~n", [Key]), + case SetValGenName of + undefined -> + basho_bench_keygen:reset_sequential_int_state(); + _ -> + basho_bench_keygen:reset_sequential_int_state(SetValGenName) + end, + {Key, gen_members(BatchSize, ValueGen)}; + {undefined, List} -> + %% We have no active set, so generate a + %% key. First run maybe + Key = KeyGen(), + ?DEBUG("New set ~p~n", [Key]), + {Key, List}; + Else -> + Else + end. + +%%%=================================================================== +%%% Private +%%%=================================================================== + +%% @private generate as many elements as we can from the valgen, if it +%% exhausts, return the results we did get. +gen_members(BatchSize, ValueGen) -> + accumulate_members(BatchSize, ValueGen, []). + +%% @private generate as many elements as we can from the valgen, if it +%% exhausts, return the results we did get. +accumulate_members(0, _ValueGen, Acc) -> + lists:reverse(Acc); +accumulate_members(BS, Gen, Acc) -> + try + accumulate_members(BS-1, Gen, [Gen() | Acc]) + catch throw:{stop, empty_keygen} -> + ?DEBUG("ValGen exhausted~n", []), + lists:reverse(Acc) + end. + diff --git a/src/basho_bench_valgen.erl b/src/basho_bench_valgen.erl index a211701a5..591b8753c 100644 --- a/src/basho_bench_valgen.erl +++ b/src/basho_bench_valgen.erl @@ -24,8 +24,13 @@ -export([new/2, dimension/2]). +-compile(export_all). -include("basho_bench.hrl"). +%% Local machines tended to bug out with Lambdas > 700. +%% Can be modified, depending on where it's running. +-define(LAMBDA_APPROX_NORM_THRESHOLD, 700). + %% ==================================================================== %% API %% ==================================================================== @@ -38,18 +43,50 @@ new({fixed_bin, Size, Val}, _Id) when is_integer(Size), Size >= 0, is_integer(Val), Val >= 0, Val =< 255 -> Data = list_to_binary(lists:duplicate(Size, Val)), fun() -> Data end; +%% Create a set of binaries with elements of Size and a cardinality of Card +new({fixed_bin_set, Size, Card}, Id) when is_integer(Size), Size >= 0 -> + basho_bench_config:set(?VAL_GEN_SRC_SIZE, Size*Card), + Source = init_source(Id), + fun() -> data_block(fun aligned_offset/2, Source, Size) end; +new({var_bin_set, Lambda, Card, poisson}, Id) when Lambda > 0 -> + DistSet = [poisson(Lambda) || _ <- lists:seq(1, Card)], + MaxSize = lists:max(DistSet), + new({var_bin_set, DistSet, MaxSize, Card}, Id); +new({var_bin_set, Lambda, LambdaThresh, Card, poisson}, Id) when Lambda > 0 -> + DistSet = [poisson(Lambda, LambdaThresh) || _ <- lists:seq(1, Card)], + MaxSize = lists:max(DistSet), + new({var_bin_set, DistSet, MaxSize, Card}, Id); +new({var_bin_set, MinSize, Mean, Card, exponential}, Id) + when is_integer(MinSize), is_integer(Mean), Mean > MinSize -> + DistSet = [exponential(MinSize, Mean) || _ <- lists:seq(1, Card)], + MaxSize = lists:max(DistSet), + new({var_bin_set, DistSet, MaxSize, Card}, Id); +new({var_bin_set, DistSet, MaxSize, _Card}=ValGenInfo, Id) + when is_list(DistSet), is_integer(MaxSize) -> + var_bin_gen(ValGenInfo, Id); +new({var_bin_set, MinSize, MaxSize, _Card}=ValGenInfo, Id) + when is_integer(MinSize), is_integer(MaxSize), MinSize >= 0, MinSize =< MaxSize -> + var_bin_gen(ValGenInfo, Id); new({fixed_char, Size}, _Id) when is_integer(Size), Size >= 0 -> fun() -> list_to_binary(lists:map(fun (_) -> random:uniform(95)+31 end, lists:seq(1,Size))) end; new({exponential_bin, MinSize, Mean}, Id) when is_integer(MinSize), MinSize >= 0, is_number(Mean), Mean > 0 -> Source = init_source(Id), - fun() -> data_block(Source, MinSize + trunc(basho_bench_stats:exponential(1 / Mean))) end; -new({uniform_bin, MinSize, MaxSize}, Id) + fun() -> data_block(Source, exponential(MinSize, Mean)) end; +new({uniform_bin, MinSize, MaxSize}, Id) when is_integer(MinSize), is_integer(MaxSize), MinSize < MaxSize -> Source = init_source(Id), Diff = MaxSize - MinSize, fun() -> data_block(Source, MinSize + random:uniform(Diff)) end; +new({poisson_bin, Lambda}, Id) + when is_integer(Lambda), Lambda > 0 -> + Source = init_source(Id), + fun() -> data_block(Source, poisson(Lambda)) end; +new({poisson_bin, Lambda, LambdaThresh}, Id) + when is_integer(Lambda), Lambda > 0, LambdaThresh >= Lambda -> + Source = init_source(Id), + fun() -> data_block(Source, poisson(Lambda, LambdaThresh)) end; new({function, Module, Function, Args}, Id) when is_atom(Module), is_atom(Function), is_list(Args) -> case code:ensure_loaded(Module) of @@ -63,7 +100,14 @@ new({uniform_int, MaxVal}, _Id) fun() -> random:uniform(MaxVal) end; new({uniform_int, MinVal, MaxVal}, _Id) when is_integer(MinVal), is_integer(MaxVal), MaxVal > MinVal -> - fun() -> random:uniform(MinVal, MaxVal) end; + fun() -> crypto:rand_uniform(MinVal, MaxVal+1) end; +new({bin_seed, Size}, Id) when is_integer(Size), Size >= 0 -> + basho_bench_config:set(?VAL_GEN_SRC_SIZE, Size), + {_, _, Bytes} = init_source(Id), + fun() -> Bytes end; +new({keygen, KeyGen}, Id) -> + %% Use a KeyGen as a Value generator + basho_bench_keygen:new(KeyGen, Id); new(Other, _Id) -> ?FAIL_MSG("Invalid value generator requested: ~p\n", [Other]). @@ -72,13 +116,24 @@ dimension({fixed_bin, Size}, KeyDimension) -> dimension(_Other, _) -> 0.0. - - %% ==================================================================== %% Internal Functions %% ==================================================================== -define(TAB, valgen_bin_tab). +-define(BINS, valgen_var_bins). + +init_var_bins(1, {SizeOrSet, MaxSize, Source}) -> + VarBins = split_bin_blocks(SizeOrSet, MaxSize, Source), + try + ?BINS = ets:new(?BINS, [public, named_table]), + true = ets:insert(?BINS, {x, VarBins}) + catch _:_ -> rerunning_id_1_init_var_bins_table_already_exists + end, + VarBins; +init_var_bins(_, _) -> + [{_, VarBins}] = ets:lookup(?BINS, x), + VarBins. init_source(Id) -> init_source(Id, basho_bench_config:get(?VAL_GEN_BLOB_CFG, undefined)). @@ -104,10 +159,12 @@ init_source(Id, Path) -> end, {?VAL_GEN_BLOB_CFG, size(Bin), Bin}. -data_block({SourceCfg, SourceSz, Source}, BlockSize) -> +data_block(Source, BlockSize) -> + data_block(fun random_offset/2, Source, BlockSize). +data_block(OffsetFun, {SourceCfg, SourceSz, Source}, BlockSize) -> case SourceSz - BlockSize > 0 of true -> - Offset = random:uniform(SourceSz - BlockSize), + Offset = OffsetFun(SourceSz, BlockSize), <<_:Offset/bytes, Slice:BlockSize/bytes, _Rest/binary>> = Source, Slice; false -> @@ -115,3 +172,69 @@ data_block({SourceCfg, SourceSz, Source}, BlockSize) -> [SourceCfg, SourceSz, BlockSize]), Source end. + +random_offset(SourceSz, BlockSize) -> + random:uniform(SourceSz - BlockSize). + +aligned_offset(SourceSz, BlockSize) -> + (random:uniform(SourceSz - BlockSize) div BlockSize) * BlockSize. + +var_bin_gen({_, SizeOrSet, MaxSize, Card}, Id) -> + basho_bench_config:set(?VAL_GEN_SRC_SIZE, MaxSize*Card), + Source = init_source(Id), + VarBins = init_var_bins(Id, {SizeOrSet, MaxSize, Source}), + %% Store list of varbins in ets for concurrent workers + fun() -> lists:nth(random:uniform(Card), VarBins) end. + +split_bin_blocks(SizeOrSet, MaxSize, {_SourceCfg, _SourceSz, Source}) -> + split_bin_blocks(SizeOrSet, MaxSize, Source, []). + +split_bin_blocks([], _MaxSize, _Source, Acc) -> + Acc; +split_bin_blocks([H|T], MaxSize, Source, Acc) -> + Size = H, + split_bin_blocks(T, Size, MaxSize, Source, Acc); + +split_bin_blocks(MinSize, MaxSize, Source, Acc) when is_integer(MinSize) -> + Size = crypto:rand_uniform(MinSize, MaxSize + 1), + split_bin_blocks(MinSize, Size, MaxSize, Source, Acc). +split_bin_blocks(X, Size, MaxSize, Source, Acc) -> + Padding = MaxSize - Size, + case Source of + <<>> -> + Acc; + <> -> + split_bin_blocks(X, MaxSize, Rest, [Chunk|Acc]); + Bin -> + [Bin|Acc] + end. + +poisson(Lambda) -> + poisson(Lambda, ?LAMBDA_APPROX_NORM_THRESHOLD). +poisson(Lambda, ApproxNormThreshold) -> + %% Since this can be very slow, approximate a normal dist. if greater + %% than the treshold + case Lambda > ApproxNormThreshold of + true -> trunc(normal(Lambda, math:sqrt(Lambda))); + false -> + P = math:exp(-Lambda), + poisson(0, Lambda, P, P, random:uniform()) + end. + +poisson(X0, Lambda, P0, S, U) + when U > S -> + X1 = X0 + 1, + P1 = P0 * Lambda / X1, + poisson(X1, Lambda, P1, S + P1, U); +poisson(X, _, _, _, _) -> + X. + +exponential(MinSize, Mean) -> + MinSize + trunc(basho_bench_stats:exponential(1 / Mean)). + +%% @see https://github.com/basho/basho_stats/blob/develop/src/basho_stats_rv.erl#L54 +normal(Mean, Sigma) -> + Rv1 = random:uniform(), + Rv2 = random:uniform(), + Rho = math:sqrt(-2 * math:log(1-Rv2)), + Rho * math:cos(2 * math:pi() * Rv1) * Sigma + Mean.