diff --git a/docs/rack-awareness.md b/docs/rack-awareness.md new file mode 100644 index 000000000..a10f456c6 --- /dev/null +++ b/docs/rack-awareness.md @@ -0,0 +1,82 @@ +# Rack Awareness / Availability zones / Location support + +The aim is to be able to increase data safety, and make the cluster more resilient +against a location/site/availability zone/rack loss. + +To achieve this, a location parameter has been introduced. +It can be set at runtime for each RIAK node. +When claiming a new ring, the list of nodes is ordered taking into consideration the +location of the individual nodes, in a manner that adjacent nodes are preferably +from different locations. + +Basically it only changes the order of the nodes fed into the claiming algorithm. + +The default location is `undefined`. This means every node with no location parameter set +will be handled as being in the same location. + +## Ring visualization + +![RIAK Ring Location](ring-location.png) + +## Setup node's location parameter + +Setting up nodes’ location parameter is a staged operation like +other ring manipulations (join, leave, resize-ring, etc). + +### via riak admin +Change current node location parameter: +```bash +riak admin cluster location rack_a +``` +or specify a node: +```bash +riak admin cluster location site_b --node=dev2@127.0.0.1 +``` + +#### by erlang function call + +```erlang +riak_core_claimant:set_node_location(node(), "location_a"), +``` +```erlang +riak_core_claimant:plan(), +riak_core_claimant:comit(). +``` + +## Pitfalls +There are circumstances in which the preferable node location assignment cannot be guaranteed. + +If at least one location parameter is set in the cluster when planning a cluster change, a warning +message will be displayed when not all nodes in a preflist are assigned to a different location. + +For example, if the default `n_val = 3` is specified and there are only `two distinct locations` set in the cluster, +the message `WARNING: Not all replicas will be on distinct locations` will be shown. + +### Not enough distinct locations +When Distinct Location Count is not divisible by Ring size. + +### Tail violations +When Ring Size not divisible by Count Of Nodes. +[claim-fixes](claim-fixes.md) cover this, but improper distinct location count could result in undesirable location distribution within the ring. + +For example, there are 8 nodes on 3 distinct locations. +To ensure that every site/location has a piece of data, n_val must be at least 4. + +It can be checked: + +Stages changes: +```erlang +PlannedRing = element(1, lists:last(element(3, riak_core_claimant:plan()))). +riak_core_location:check_ring(PlannedRing, Nval = 4, MinimumNumberOfDistinctLocations = 3). +``` + +Actual ring: +```erlang +{ok, Ring} = riak_core_ring_manager:get_my_ring(), +riak_core_location:check_ring(Ring, Nval = 4, MinimumNumberOfDistinctLocations = 3). +``` + +If `riak_core_location:check_ring/3` returns with an empty list `[]`, there is no location violation. + +### Won't optimize transfers between old and new ring +When location parameter change triggers ring ownership change, it currently does not optimize transfers. diff --git a/docs/ring-location.png b/docs/ring-location.png new file mode 100644 index 000000000..cca6bc1aa Binary files /dev/null and b/docs/ring-location.png differ diff --git a/src/riak_core_claim.erl b/src/riak_core_claim.erl index e40247c0d..f205698db 100644 --- a/src/riak_core_claim.erl +++ b/src/riak_core_claim.erl @@ -187,7 +187,12 @@ wants_claim_v2(Ring, Node) -> Count = proplists:get_value(Node, Counts, 0), case Count < Avg of false -> - no; + case riak_core_ring:has_location_changed(Ring) of + true -> + {yes, 1}; + false -> + no + end; true -> {yes, Avg - Count} end. @@ -289,7 +294,8 @@ choose_claim_v2(Ring, Node) -> Params = default_choose_params(), choose_claim_v2(Ring, Node, Params). -choose_claim_v2(Ring, Node, Params0) -> +choose_claim_v2(RingOrig, Node, Params0) -> + Ring = riak_core_ring:clear_location_changed(RingOrig), Params = default_choose_params(Params0), %% Active::[node()] Active = riak_core_ring:claiming_members(Ring), @@ -326,7 +332,8 @@ choose_claim_v2(Ring, Node, Params0) -> %% number of indices desired is less than the computed set. Padding = lists:duplicate(TargetN, undefined), Expanded = lists:sublist(Active ++ Padding, TargetN), - PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded), + ExpandedLocation = get_nodes_by_location(Expanded, Ring), + PreferredClaim = riak_core_claim:diagonal_stripe(Ring, ExpandedLocation), PreferredNth = [begin {Nth, Idx} = lists:keyfind(Idx, 2, AllIndices), Nth @@ -343,8 +350,10 @@ choose_claim_v2(Ring, Node, Params0) -> Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices, TargetN, RingSize), %% Claim indices from the remaining candidate set - Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize), - Claim2 = lists:sublist(Claim, Want), + Claim2 = case select_indices(Owners, Deltas, Indices2, TargetN, RingSize) of + [] -> []; + Claim -> lists:sublist(Claim, Want) + end, NewRing = lists:foldl(fun(Idx, Ring0) -> riak_core_ring:transfer_node(Idx, Node, Ring0) end, Ring, Claim2), @@ -622,7 +631,8 @@ claim_diagonal(Wants, Owners, Params) -> riak_core_ring:riak_core_ring(). sequential_claim(Ring0, Node, TargetN) -> Ring = riak_core_ring:upgrade(Ring0), - Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]), + OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]), + Nodes = get_nodes_by_location(OrigNodes, Ring), NodeCount = length(Nodes), RingSize = riak_core_ring:num_partitions(Ring), @@ -709,7 +719,8 @@ backfill_ring(RingSize, Nodes, Remaining, Acc) -> claim_rebalance_n(Ring0, Node) -> Ring = riak_core_ring:upgrade(Ring0), - Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]), + OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]), + Nodes = get_nodes_by_location(OrigNodes, Ring), Zipped = diagonal_stripe(Ring, Nodes), lists:foldl(fun({P, N}, Acc) -> @@ -1270,6 +1281,19 @@ indices_within_n([This | Indices], TN, Last, Q, Acc) -> circular_distance(I1, I2, Q) -> min((Q + I1 - I2) rem Q, (Q + I2 - I1) rem Q). +%% @private +%% Get active nodes ordered by take location parameters into account +-spec get_nodes_by_location([node()|undefined], riak_core_ring:riak_core_ring()) -> + [node()|undefined]. +get_nodes_by_location(Nodes, Ring) -> + NodesLocations = riak_core_ring:get_nodes_locations(Ring), + case riak_core_location:has_location_set_in_cluster(NodesLocations) of + false -> + Nodes; + true -> + riak_core_location:stripe_nodes_by_location(Nodes, NodesLocations) + end. + %% =================================================================== %% Unit tests %% =================================================================== diff --git a/src/riak_core_claimant.erl b/src/riak_core_claimant.erl index 33ab44956..a02ecc0aa 100644 --- a/src/riak_core_claimant.erl +++ b/src/riak_core_claimant.erl @@ -42,7 +42,8 @@ activate_bucket_type/1, get_bucket_type/2, get_bucket_type/3, - bucket_type_iterator/0]). + bucket_type_iterator/0, + set_node_location/2]). -export([reassign_indices/1]). % helpers for claim sim %% gen_server callbacks @@ -52,7 +53,8 @@ -type action() :: leave | remove | {replace, node()} - | {force_replace, node()}. + | {force_replace, node()} + | {set_location, string()}. -type riak_core_ring() :: riak_core_ring:riak_core_ring(). @@ -164,6 +166,11 @@ abort_resize() -> pending_close(Ring, RingID) -> gen_server:call(?MODULE, {pending_close, Ring, RingID}). +%% @doc Stage a request to set a new location for the given node. +-spec set_node_location(node(), string()) -> ok | {error, atom()}. +set_node_location(Node, Location) -> + stage(Node, {set_location, Location}). + %% @doc Clear the current set of staged transfers clear() -> gen_server:call(claimant(), clear, infinity). @@ -446,8 +453,9 @@ maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) -> {_, _, false} -> {ignore, plan_changed}; _ -> - NewRing = riak_core_ring:increment_vclock(Claimant, NextRing), - {new_ring, NewRing} + NewRing0 = riak_core_ring:clear_location_changed(NextRing), + NewRing1 = riak_core_ring:increment_vclock(Claimant, NewRing0), + {new_ring, NewRing1} end. %% @private @@ -502,7 +510,9 @@ valid_request(Node, Action, Changes, Ring) -> {resize, NewRingSize} -> valid_resize_request(NewRingSize, Changes, Ring); abort_resize -> - valid_resize_abort_request(Ring) + valid_resize_abort_request(Ring); + {set_location, Location} -> + valid_set_location_request(Location, Node, Ring) end. %% @private @@ -615,6 +625,20 @@ valid_resize_abort_request(Ring) -> false -> {error, not_resizing} end. +%% @private +%% Validating node member status +valid_set_location_request(_Location, Node, Ring) -> + case riak_core_ring:member_status(Ring, Node) of + valid -> + true; + joining -> + true; + invalid -> + {error, not_member}; + _ -> + true + end. + %% @private %% @doc Filter out any staged changes that are no longer valid. Changes %% can become invalid based on other staged changes, or by cluster @@ -1094,7 +1118,9 @@ change({{force_replace, NewNode}, Node}, Ring) -> change({{resize, NewRingSize}, _Node}, Ring) -> riak_core_ring:resize(Ring, NewRingSize); change({abort_resize, _Node}, Ring) -> - riak_core_ring:set_pending_resize_abort(Ring). + riak_core_ring:set_pending_resize_abort(Ring); +change({{set_location, Location}, Node}, Ring) -> + riak_core_ring:set_node_location(Node, Location, Ring). internal_ring_changed(Node, CState) -> {Changed, CState5} = do_claimant(Node, CState, fun log/2), diff --git a/src/riak_core_cluster_cli.erl b/src/riak_core_cluster_cli.erl index 81bd929ef..09cec463d 100644 --- a/src/riak_core_cluster_cli.erl +++ b/src/riak_core_cluster_cli.erl @@ -46,12 +46,14 @@ register_all_usage() -> clique:register_usage(["riak-admin", "cluster", "status"], status_usage()), clique:register_usage(["riak-admin", "cluster", "partition"], partition_usage()), clique:register_usage(["riak-admin", "cluster", "partitions"], partitions_usage()), - clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()). + clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()), + clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()), + clique:register_usage(["riak-admin", "cluster", "location"], location_usage()). register_all_commands() -> lists:foreach(fun(Args) -> apply(clique, register_command, Args) end, [status_register(), partition_count_register(), - partitions_register(), partition_register()]). + partitions_register(), partition_register(), location_register()]). %%% %% Cluster status @@ -72,6 +74,7 @@ cluster_usage() -> " partition Map partition IDs to indexes\n", " partitions Display partitions on a node\n", " partition-count Display ring size or node partition count\n\n", + " location Set node location\n\n", " Use --help after a sub-command for more details.\n" ]. @@ -111,12 +114,20 @@ status(_CmdBase, [], []) -> [T0,T1,Table,T2]. format_status(Node, Status, Ring, RingStatus) -> - {Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus, - [{node, is_claimant(Node, Claimant)}, - {status, Status}, - {avail, node_availability(Node, Down, MarkedDown)}, - {ring, claim_percent(Ring, Node)}, - {pending, future_claim_percentage(Changes, Ring, Node)}]. + NodesLocations = riak_core_ring:get_nodes_locations(Ring), + HasLocationInCluster = riak_core_location:has_location_set_in_cluster(NodesLocations), + format_status(Node, Status, Ring, RingStatus, HasLocationInCluster, NodesLocations). + +format_status(Node, Status, Ring, RingStatus, false, _) -> + {Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus, + [{node, is_claimant(Node, Claimant)}, + {status, Status}, + {avail, node_availability(Node, Down, MarkedDown)}, + {ring, claim_percent(Ring, Node)}, + {pending, future_claim_percentage(Changes, Ring, Node)}]; +format_status(Node, Status, Ring, RingStatus, true, NodesLocations) -> + Row = format_status(Node, Status, Ring, RingStatus, false, NodesLocations), + Row ++ [{location, riak_core_location:get_node_location(Node, NodesLocations)}]. is_claimant(Node, Node) -> " (C) " ++ atom_to_list(Node) ++ " "; @@ -263,6 +274,45 @@ id_out1(id, Id, Ring, RingSize) when Id < RingSize -> id_out1(id, Id, _Ring, _RingSize) -> make_alert(["ERROR: Id ", integer_to_list(Id), " is invalid."]). + +%%% +%% Location +%%% +location_usage() -> + ["riak-admin cluster location [--node node]\n\n", + " Set the node location parameter\n\n", + "Options\n", + " -n , --node \n", + " Set node location for the specified node.\n" + ]. + +location_register() -> + [["riak-admin", "cluster", "location", '*'], % Cmd + [], % KeySpecs + [{node, [{shortname, "n"}, {longname, "node"}, + {typecast, fun clique_typecast:to_node/1}]}], % FlagSpecs + fun stage_set_location/3]. % Implementation callback + +stage_set_location([_, _, _, Location], _, Flags) -> + Node = proplists:get_value(node, Flags, node()), + try + case riak_core_claimant:set_node_location(Node, Location) of + ok -> + [clique_status:text( + io_lib:format("Success: staged changing location of node ~p to ~s~n", + [Node, Location]))]; + {error, not_member} -> + make_alert( + io_lib:format("Failed: ~p is not a member of the cluster.~n", [Node]) + ) + end + catch + Exception:Reason -> + lager:error("Setting node location failed ~p:~p", [Exception, Reason]), + make_alert("Setting node location failed, see log for details~n") + end. + + %%% %% Internal %%% diff --git a/src/riak_core_console.erl b/src/riak_core_console.erl index cf8ff5400..fcb085799 100644 --- a/src/riak_core_console.erl +++ b/src/riak_core_console.erl @@ -67,6 +67,7 @@ print_member_status(Ring, LegacyGossip) -> io:format("~79..-s~n", [""]), AllStatus = lists:keysort(2, riak_core_ring:all_member_status(Ring)), IsPending = ([] /= riak_core_ring:pending_changes(Ring)), + Locations = riak_core_ring:get_nodes_locations(Ring), {Joining, Valid, Down, Leaving, Exiting} = lists:foldl(fun({Node, Status}, @@ -78,15 +79,16 @@ print_member_status(Ring, LegacyGossip) -> {RingPercent, NextPercent} = pending_claim_percentage(Ring, Node), + DisplayNode = maybe_add_location(Node, Locations), case IsPending of true -> - io:format("~-8s ~5.1f% ~5.1f% ~p~n", + io:format("~-8s ~5.1f% ~5.1f% ~s~n", [StatusOut, RingPercent, - NextPercent, Node]); + NextPercent, DisplayNode]); false -> - io:format("~-8s ~5.1f% -- ~p~n", - [StatusOut, RingPercent, Node]) + io:format("~-8s ~5.1f% -- ~s~n", + [StatusOut, RingPercent, DisplayNode]) end, case Status of joining -> @@ -106,6 +108,14 @@ print_member_status(Ring, LegacyGossip) -> [Valid, Leaving, Exiting, Joining, Down]), ok. +maybe_add_location(Node, Locations) -> + case riak_core_location:get_node_location(Node, Locations) of + undefined -> + atom_to_list(Node); + Location -> + atom_to_list(Node) ++ " (" ++ Location ++ ")" + end. + ring_status([]) -> {Claimant, RingReady, Down, MarkedDown, Changes} = riak_core_status:ring_status(), @@ -649,7 +659,9 @@ print_plan(Changes, Ring, NextRings) -> io:format("resize-ring ~p to ~p partitions~n",[CurrentSize,NewRingSize]); ({_, abort_resize}) -> CurrentSize = riak_core_ring:num_partitions(Ring), - io:format("resize-ring abort. current size: ~p~n", [CurrentSize]) + io:format("resize-ring abort. current size: ~p~n", [CurrentSize]); + ({Node, {set_location, Location}}) -> + io:format("set-location ~p to ~s~n", [Node, Location]) end, Changes), io:format("~79..-s~n", [""]), io:format("~n"), @@ -697,6 +709,13 @@ output(Ring, NextRing) -> io:format("WARNING: Not all replicas will be on distinct nodes~n~n") end, + case riak_core_location:check_ring(FutureRing) of + [] -> + ok; + _ -> + io:format("WARNING: Not all replicas will be on distinct locations~n~n") + end, + Owners1 = riak_core_ring:all_owners(Ring), Owners2 = riak_core_ring:all_owners(NextRing), Owners3 = lists:zip(Owners1, Owners2), diff --git a/src/riak_core_location.erl b/src/riak_core_location.erl new file mode 100644 index 000000000..6220b5019 --- /dev/null +++ b/src/riak_core_location.erl @@ -0,0 +1,111 @@ +-module(riak_core_location). + +%% API +-export([get_node_location/2, + has_location_set_in_cluster/1, + stripe_nodes_by_location/2, + check_ring/1, + check_ring/2, + check_ring/3]). + +-spec get_node_location(node(), dict:dict()) -> string() | undefined. +get_node_location(Node, Locations) -> + case dict:find(Node, Locations) of + error -> + undefined; + {ok, Location} -> + Location + end. + +-spec has_location_set_in_cluster(dict:dict()) -> boolean(). +has_location_set_in_cluster(NodesLocations) -> + 0 =/= dict:size(NodesLocations). + +-spec get_location_nodes([node()|undefined], dict:dict()) -> dict:dict(). +get_location_nodes(Nodes, Locations) -> + lists:foldl(fun(undefined, Acc) -> + dict:append(undefined, undefined, Acc); + (Node, Acc) -> + NodeLocation = get_node_location(Node, Locations), + dict:append(NodeLocation, Node, Acc) + end, dict:new(), Nodes). + +%% Order nodes list by having a different location after each other +-spec stripe_nodes_by_location([node()|undefined], dict:dict()) -> + [node()|undefined]. +stripe_nodes_by_location(Nodes, NodesLocations) -> + LocationsNodes = get_location_nodes(Nodes, NodesLocations), + stripe_nodes_by_location(get_locations(LocationsNodes), LocationsNodes, []). + +-spec get_locations(dict:dict()) -> [node()|undefined]. +get_locations(LocationsNodes) -> + LocationNodesSorted = sort_by_node_count(dict:to_list(LocationsNodes)), + lists:map(fun({Location, _}) -> Location end, LocationNodesSorted). + +-spec sort_by_node_count(list()) -> list(). +sort_by_node_count(LocationsNodes) -> + lists:sort(fun compare/2, LocationsNodes). + +-spec compare({any(), list()}, {any(), list()}) -> boolean(). +compare({_, NodesA}, {_, NodesB}) -> + LengthA = length(NodesA), + LengthB = length(NodesB), + case LengthA =:= LengthB of + true -> + lists:last(NodesA) >= lists:last(NodesB); + _ -> + LengthA >= LengthB + end. + +-spec stripe_nodes_by_location([node()|undefined], dist:dict(), [node()|undefined]) -> + [node()|undefined]. +stripe_nodes_by_location([], _LocationsNodes, NewNodeList) -> + lists:reverse(NewNodeList); +stripe_nodes_by_location(Locations, LocationsNodes, NewNodeList) -> + Nth = (length(NewNodeList) rem length(Locations)) + 1, + CurrentLocation = lists:nth(Nth, Locations), + case dict:find(CurrentLocation, LocationsNodes) of + {ok, []} -> + NewLocations = lists:delete(CurrentLocation, Locations), + NewLocationsNodes = dict:erase(CurrentLocation, LocationsNodes), + stripe_nodes_by_location(NewLocations, NewLocationsNodes, NewNodeList); + {ok, [Node | RemNodes]} -> + NewLocationNodes = dict:store(CurrentLocation, RemNodes, LocationsNodes), + stripe_nodes_by_location(Locations, NewLocationNodes, [Node | NewNodeList]) + end. + +-spec check_ring(riak_core_ring:riak_core_ring()) -> list(). +check_ring(Ring) -> + {ok, Props} = application:get_env(riak_core, default_bucket_props), + check_ring(Ring, proplists:get_value(n_val, Props)). + +-spec check_ring(riak_core_ring:riak_core_ring(), pos_integer()) -> list(). +check_ring(Ring, Nval) -> + check_ring(Ring, Nval, Nval). + +-spec check_ring(riak_core_ring:riak_core_ring(), pos_integer(), pos_integer()) -> list(). +check_ring(Ring, NVal, MinimumNumberOfDistinctLocations) -> + Locations = riak_core_ring:get_nodes_locations(Ring), + case has_location_set_in_cluster(Locations) of + true -> + check_ring(Ring, NVal, MinimumNumberOfDistinctLocations, Locations); + _ -> + [] % no location set, not needed to check + end. + +-spec check_ring(riak_core_ring:riak_core_ring(), pos_integer(), pos_integer(), dict:dict()) -> + list(). +check_ring(Ring, Nval, MinimumNumberOfDistinctLocations, Locations) -> + lists:foldl(fun(PL, Acc) -> + case length(get_unique_locations(PL, Locations)) < MinimumNumberOfDistinctLocations of + false -> Acc; + _ -> + Error = [{Idx, Node, get_node_location(Node, Locations)} || {Idx, Node} <- PL], + [Error | Acc] + end + end, [], riak_core_ring:all_preflists(Ring, Nval)). + +-spec get_unique_locations(list(), dict:dict()) -> + list(). +get_unique_locations(PrefLists, Locations) -> + lists:usort([get_node_location(Node, Locations) || {_, Node} <- PrefLists]). \ No newline at end of file diff --git a/src/riak_core_ring.erl b/src/riak_core_ring.erl index 318a10812..b79ef5c0a 100644 --- a/src/riak_core_ring.erl +++ b/src/riak_core_ring.erl @@ -81,6 +81,10 @@ set_member/4, set_member/5, members/2, + has_location_changed/1, + clear_location_changed/1, + set_node_location/3, + get_nodes_locations/1, set_claimant/2, increment_vclock/2, ring_version/1, @@ -326,6 +330,28 @@ all_members(?CHSTATE{members=Members}) -> members(?CHSTATE{members=Members}, Types) -> get_members(Members, Types). +-spec has_location_changed(chstate()) -> boolean(). +has_location_changed(State) -> + {ok, Value} = get_meta('$nodes_locations_changed', false, State), + Value. + +-spec clear_location_changed(chstate()) -> chstate(). +clear_location_changed(State) -> + update_meta('$nodes_locations_changed', false, State). + +-spec set_node_location(node(), string(), chstate()) -> chstate(). +set_node_location(Node, Location, State) -> + NodesLocations = get_nodes_locations(State), + NewNodesLocations = dict:store(Node, Location, NodesLocations), + NewState = update_meta('$nodes_locations_changed', true, State), + update_meta('$nodes_locations', NewNodesLocations, NewState). + +-spec get_nodes_locations(chstate()) -> dict:dict(). +get_nodes_locations(?CHSTATE{members =Members} = ChState) -> + {ok, Value} = get_meta('$nodes_locations', dict:new(), ChState), + Nodes = get_members(Members), + dict:filter(fun(Node, _) -> lists:member(Node, Nodes) end, Value). + %% @doc Produce a list of all active (not marked as down) cluster members active_members(?CHSTATE{members=Members}) -> get_members(Members, [joining, valid, leaving, exiting]). diff --git a/test/rack_awareness_test.erl b/test/rack_awareness_test.erl new file mode 100644 index 000000000..0fadbc880 --- /dev/null +++ b/test/rack_awareness_test.erl @@ -0,0 +1,224 @@ +%%% @author Peter Tihanyi +%%% @copyright (C) 2021, Peter Tihanyi +%%% @doc +%%% +%%% @end +-module(rack_awareness_test). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(RING_SIZES, [8, 64, 128, 256, 512]). +-define(NODE_COUNTS, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 20]). + +ring_claim_test_() -> + {setup, + fun generate_rings/0, + fun (_) -> ok end, + fun(Rings) -> + RingsWithLocation = lists:flatten(lists:map(fun add_loc_to_ring/1, Rings)), + Threads = erlang:system_info(schedulers_online), + {inparallel, Threads, [{get_test_desc(Ring), + fun() -> assert_vnode_location(claim(Ring)) end} + || Ring <- RingsWithLocation]} + end + }. + +get_test_desc(Ring) -> + lists:flatten( + io_lib:format("ring_size: ~p node_count: ~p nodes with locations: ~p", + [riak_core_ring:num_partitions(Ring), + length(riak_core_ring:all_members(Ring)), + dict:size(riak_core_ring:get_nodes_locations(Ring))]) + ). + +add_loc_to_ring(Ring) -> + Nodes = riak_core_ring:all_members(Ring), + NodeCount = length(Nodes), + Locations = generate_site_names(NodeCount), + %Permutations = gen_loc_permutations(Locations, #{}, NodeCount), + Permutations = gen_loc_permutations_opt(Locations, NodeCount), + lists:map(fun(Permutation) -> + {_, NewRing} = + maps:fold(fun(Location, Count, {CNodes, CRing}) -> + do_set_locations(CNodes, Location, Count, CRing) + end, {Nodes, Ring}, Permutation), + NewRing + end, Permutations). + +do_set_locations(Nodes, _Location, 0, Ring) -> + {Nodes, Ring}; +do_set_locations([Node | RemNodes], Location, Count, Ring) -> + NewRing = riak_core_ring:set_node_location(Node, Location, Ring), + do_set_locations(RemNodes, Location, Count-1, NewRing). + +gen_loc_permutations_opt(Locations, Max) -> + InitPermutationMap = init_permutation_map(Locations), + HeadLoc = hd(Locations), + gen_loc_permutations_opt(HeadLoc, Locations, InitPermutationMap, Max, + [InitPermutationMap]). + +gen_loc_permutations_opt(_, [], _Permutation, _Max, Acc) -> + Acc; +gen_loc_permutations_opt(TopLoc, [HeadLoc | RestLoc] = Locations, Permutation, Max, + Acc) -> + case get_location_combination_count(Permutation) =:= Max of + true -> + NewPermutations = increase(Permutation, TopLoc, -1), + TopLocCount = maps:get(TopLoc, NewPermutations), + HeadLocCount = maps:get(HeadLoc, NewPermutations), + case TopLocCount > HeadLocCount of + true -> + NewPermutations2 = increase(NewPermutations, HeadLoc, -1), + gen_loc_permutations_opt(TopLoc, Locations, NewPermutations2, Max, Acc); + _ -> + gen_loc_permutations_opt(HeadLoc, RestLoc, NewPermutations, Max, Acc) + end; + _ -> + NewPermutations = increase(Permutation, HeadLoc, 1), + gen_loc_permutations_opt(TopLoc, Locations, NewPermutations, Max, + [NewPermutations | Acc]) + end. + + +init_permutation_map(Locations) -> + lists:foldl(fun(LocName, Acc) -> Acc#{LocName => 0} end, #{}, Locations). + +gen_loc_permutations([], Acc, _Max) -> + Acc; +gen_loc_permutations(Loc, Acc, Max) when length(Loc) > 6 -> + LocLength = length(Loc), + NewLocations = lists:delete(lists:nth(rand:uniform(LocLength), Loc), Loc), + gen_loc_permutations(NewLocations, Acc, Max); +gen_loc_permutations([LocationName | Rest] = _Full, Acc, Max) -> + Result = [gen_loc_permutations(Rest, increase(Acc, LocationName, Add), Max) || Add <- lists:seq(0, Max)], + lists:filter(fun(Item) -> + get_location_combination_count(Item) =< Max + end, lists:flatten(Result)). + +increase(State, LocationName, Add) -> + State#{LocationName => maps:get(LocationName, State, 0)+Add}. + +get_location_combination_count(Locations) -> + maps:fold(fun(_, V, A) -> V+A end, 0, Locations). + +assert_vnode_location(Ring) -> + Locations = dict:to_list(riak_core_ring:get_nodes_locations(Ring)), + assert_vnode_location(Ring, Locations). + +assert_vnode_location(_Ring, []) -> + true; +assert_vnode_location(Ring, Locations) -> + CheckResult = check(Ring, Locations), + case CheckResult of + false -> + Owners = riak_core_ring:all_owners(Ring), + Members = riak_core_ring:all_members(Ring), + Data = print_ring_partitions(Owners, Locations, ""), + RSize = io_lib:format("Ring size: ~p~n", [length(Owners)]), + LocationData = io_lib:format("Location data: ~p~n", [Locations]), + MemberCount = io_lib:format("Nodes: ~p~n", [length(Members)]), + RingErrors = io_lib:format("Check ring: ~p~n", + [riak_core_location:check_ring(Ring, 3)]), + NewData = ["\r\n *********************** \r\n", RSize, MemberCount, + LocationData, RingErrors, "\r\n" | Data], + file:write_file("ring_debug.txt", NewData, [append, write]); + _ -> + ok + end, + ?assert(CheckResult). + +check(Ring, Locations) -> + Owners = riak_core_ring:all_owners(Ring), + MemberCount = length(riak_core_ring:active_members(Ring)), + NVal = 3, + Rs = length(Owners), + UniqueLocCount = length(get_unique_locations(Locations)), + + MaxError = get_max_error(NVal, Rs, MemberCount, UniqueLocCount), + case riak_core_location:check_ring(Ring, NVal) of + Errors when length(Errors) > MaxError -> + io:format("Max error; ~p > ~p | ~p~n", + [length(Errors), MaxError, {NVal, Rs, MemberCount, + UniqueLocCount}]), + false; + _ -> + true + end. + +% @TODO better evaluation +get_max_error(NVal, RingSize, MemberCount, UniqueLocationCount) + when UniqueLocationCount > NVal andalso + MemberCount == UniqueLocationCount andalso + RingSize rem MemberCount == 0 -> + 0; +get_max_error(NVal, _RingSize, MemberCount, UniqueLocationCount) + when UniqueLocationCount > NVal andalso + MemberCount == UniqueLocationCount -> + NVal; +get_max_error(NVal, RingSize, MemberCount, UniqueLocationCount) + when RingSize rem MemberCount == 0 -> % no tail violations + MaxNodesOnSameLoc = (MemberCount - UniqueLocationCount)+1, + case MaxNodesOnSameLoc > NVal of + true -> + RingSize; + false -> + (RingSize - ((NVal rem MaxNodesOnSameLoc))+1) + end; +get_max_error(_NVal, RingSize, _MemberCount, _UniqueLocationCount) -> + RingSize. + +get_unique_locations(Locations) -> + lists:foldl(fun({_, ActualLoc}, Acc) -> + case lists:member(ActualLoc, Acc) of + false -> + [ActualLoc | Acc]; + _ -> + Acc + end + end, [], Locations). + +print_ring_partitions(Owners, Locations) -> + io:format(user, "~s", [print_ring_partitions(Owners, Locations, [])]). + +print_ring_partitions([], _Locations, Acc) -> + lists:reverse(Acc); +print_ring_partitions([{Idx, Node} | Rem], Locations, Acc) -> + Location = proplists:get_value(Node, Locations, not_set), + Line = io_lib:format("Node: ~p \t Loc: ~p \t idx: ~p ~n", [Node, Location, Idx]), + print_ring_partitions(Rem, Locations, [Line | Acc]). + +generate_rings() -> + generate_rings(?RING_SIZES, ?NODE_COUNTS). + +generate_rings(Sizes, NodeCounts) -> + [do_generate_ring(S, generate_node_names(N)) || + S <- Sizes, N <- NodeCounts]. + +do_generate_ring(Size, ContributorNodes) when length(ContributorNodes) > Size -> + do_generate_ring(Size, lists:sublist(ContributorNodes, 1, Size)); +do_generate_ring(Size, ContributorNodes) -> + [CNode] = generate_node_names(1), + Ring = riak_core_ring:fresh(Size, CNode), + NewRing = lists:foldl(fun(Node, CRing) -> + riak_core_ring:add_member(CNode, CRing, Node) + end, Ring, ContributorNodes), + claim(NewRing). + +claim(Ring) -> + WantsClaimFun = {riak_core_claim, default_wants_claim}, + ChooseClaimFun = {riak_core_claim, default_choose_claim}, + riak_core_claim:claim(Ring, WantsClaimFun, ChooseClaimFun). + +generate_site_names(Count) -> + lists:map(fun(Name) -> binary_to_list(Name) end, + tl(generate_name(<<"site_">>, Count+1))). + +generate_node_names(Count) -> + lists:map(fun(Name) -> binary_to_atom(<>, utf8) end, + generate_name(<<"node_">>, Count)). + +generate_name(Prefix, Count) when Count =< 25 andalso Count >= 1 -> + GenList = lists:seq(65, 65+Count-1), + [<> || C <- GenList].