Skip to content

Commit

Permalink
Merge pull request #967 from systream/rack_awareness
Browse files Browse the repository at this point in the history
Add Rack awareness support
  • Loading branch information
martinsumner authored Apr 29, 2021
2 parents e817ee7 + c196b6e commit 1f7e204
Show file tree
Hide file tree
Showing 9 changed files with 588 additions and 26 deletions.
82 changes: 82 additions & 0 deletions docs/rack-awareness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Rack Awareness / Availability zones / Location support

The aim is to be able to increase data safety, and make the cluster more resilient
against a location/site/availability zone/rack loss.

To achieve this, a location parameter has been introduced.
It can be set at runtime for each RIAK node.
When claiming a new ring, the list of nodes is ordered taking into consideration the
location of the individual nodes, in a manner that adjacent nodes are preferably
from different locations.

Basically it only changes the order of the nodes fed into the claiming algorithm.

The default location is `undefined`. This means every node with no location parameter set
will be handled as being in the same location.

## Ring visualization

![RIAK Ring Location](ring-location.png)

## Setup node's location parameter

Setting up nodes’ location parameter is a staged operation like
other ring manipulations (join, leave, resize-ring, etc).

### via riak admin
Change current node location parameter:
```bash
riak admin cluster location rack_a
```
or specify a node:
```bash
riak admin cluster location site_b [email protected]
```

#### by erlang function call

```erlang
riak_core_claimant:set_node_location(node(), "location_a"),
```
```erlang
riak_core_claimant:plan(),
riak_core_claimant:comit().
```

## Pitfalls
There are circumstances in which the preferable node location assignment cannot be guaranteed.

If at least one location parameter is set in the cluster when planning a cluster change, a warning
message will be displayed when not all nodes in a preflist are assigned to a different location.

For example, if the default `n_val = 3` is specified and there are only `two distinct locations` set in the cluster,
the message `WARNING: Not all replicas will be on distinct locations` will be shown.

### Not enough distinct locations
When Distinct Location Count is not divisible by Ring size.

### Tail violations
When Ring Size not divisible by Count Of Nodes.
[claim-fixes](claim-fixes.md) cover this, but improper distinct location count could result in undesirable location distribution within the ring.

For example, there are 8 nodes on 3 distinct locations.
To ensure that every site/location has a piece of data, n_val must be at least 4.

It can be checked:

Stages changes:
```erlang
PlannedRing = element(1, lists:last(element(3, riak_core_claimant:plan()))).
riak_core_location:check_ring(PlannedRing, Nval = 4, MinimumNumberOfDistinctLocations = 3).
```

Actual ring:
```erlang
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
riak_core_location:check_ring(Ring, Nval = 4, MinimumNumberOfDistinctLocations = 3).
```

If `riak_core_location:check_ring/3` returns with an empty list `[]`, there is no location violation.

### Won't optimize transfers between old and new ring
When location parameter change triggers ring ownership change, it currently does not optimize transfers.
Binary file added docs/ring-location.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 31 additions & 7 deletions src/riak_core_claim.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ wants_claim_v2(Ring, Node) ->
Count = proplists:get_value(Node, Counts, 0),
case Count < Avg of
false ->
no;
case riak_core_ring:has_location_changed(Ring) of
true ->
{yes, 1};
false ->
no
end;
true ->
{yes, Avg - Count}
end.
Expand Down Expand Up @@ -289,7 +294,8 @@ choose_claim_v2(Ring, Node) ->
Params = default_choose_params(),
choose_claim_v2(Ring, Node, Params).

choose_claim_v2(Ring, Node, Params0) ->
choose_claim_v2(RingOrig, Node, Params0) ->
Ring = riak_core_ring:clear_location_changed(RingOrig),
Params = default_choose_params(Params0),
%% Active::[node()]
Active = riak_core_ring:claiming_members(Ring),
Expand Down Expand Up @@ -326,7 +332,8 @@ choose_claim_v2(Ring, Node, Params0) ->
%% number of indices desired is less than the computed set.
Padding = lists:duplicate(TargetN, undefined),
Expanded = lists:sublist(Active ++ Padding, TargetN),
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
ExpandedLocation = get_nodes_by_location(Expanded, Ring),
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, ExpandedLocation),
PreferredNth = [begin
{Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
Nth
Expand All @@ -343,8 +350,10 @@ choose_claim_v2(Ring, Node, Params0) ->
Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
TargetN, RingSize),
%% Claim indices from the remaining candidate set
Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
Claim2 = lists:sublist(Claim, Want),
Claim2 = case select_indices(Owners, Deltas, Indices2, TargetN, RingSize) of
[] -> [];
Claim -> lists:sublist(Claim, Want)
end,
NewRing = lists:foldl(fun(Idx, Ring0) ->
riak_core_ring:transfer_node(Idx, Node, Ring0)
end, Ring, Claim2),
Expand Down Expand Up @@ -622,7 +631,8 @@ claim_diagonal(Wants, Owners, Params) ->
riak_core_ring:riak_core_ring().
sequential_claim(Ring0, Node, TargetN) ->
Ring = riak_core_ring:upgrade(Ring0),
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
Nodes = get_nodes_by_location(OrigNodes, Ring),
NodeCount = length(Nodes),
RingSize = riak_core_ring:num_partitions(Ring),

Expand Down Expand Up @@ -709,7 +719,8 @@ backfill_ring(RingSize, Nodes, Remaining, Acc) ->

claim_rebalance_n(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
Nodes = get_nodes_by_location(OrigNodes, Ring),
Zipped = diagonal_stripe(Ring, Nodes),

lists:foldl(fun({P, N}, Acc) ->
Expand Down Expand Up @@ -1270,6 +1281,19 @@ indices_within_n([This | Indices], TN, Last, Q, Acc) ->
circular_distance(I1, I2, Q) ->
min((Q + I1 - I2) rem Q, (Q + I2 - I1) rem Q).

%% @private
%% Get active nodes ordered by take location parameters into account
-spec get_nodes_by_location([node()|undefined], riak_core_ring:riak_core_ring()) ->
[node()|undefined].
get_nodes_by_location(Nodes, Ring) ->
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
case riak_core_location:has_location_set_in_cluster(NodesLocations) of
false ->
Nodes;
true ->
riak_core_location:stripe_nodes_by_location(Nodes, NodesLocations)
end.

%% ===================================================================
%% Unit tests
%% ===================================================================
Expand Down
38 changes: 32 additions & 6 deletions src/riak_core_claimant.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
activate_bucket_type/1,
get_bucket_type/2,
get_bucket_type/3,
bucket_type_iterator/0]).
bucket_type_iterator/0,
set_node_location/2]).
-export([reassign_indices/1]). % helpers for claim sim

%% gen_server callbacks
Expand All @@ -52,7 +53,8 @@
-type action() :: leave
| remove
| {replace, node()}
| {force_replace, node()}.
| {force_replace, node()}
| {set_location, string()}.

-type riak_core_ring() :: riak_core_ring:riak_core_ring().

Expand Down Expand Up @@ -164,6 +166,11 @@ abort_resize() ->
pending_close(Ring, RingID) ->
gen_server:call(?MODULE, {pending_close, Ring, RingID}).

%% @doc Stage a request to set a new location for the given node.
-spec set_node_location(node(), string()) -> ok | {error, atom()}.
set_node_location(Node, Location) ->
stage(Node, {set_location, Location}).

%% @doc Clear the current set of staged transfers
clear() ->
gen_server:call(claimant(), clear, infinity).
Expand Down Expand Up @@ -446,8 +453,9 @@ maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) ->
{_, _, false} ->
{ignore, plan_changed};
_ ->
NewRing = riak_core_ring:increment_vclock(Claimant, NextRing),
{new_ring, NewRing}
NewRing0 = riak_core_ring:clear_location_changed(NextRing),
NewRing1 = riak_core_ring:increment_vclock(Claimant, NewRing0),
{new_ring, NewRing1}
end.

%% @private
Expand Down Expand Up @@ -502,7 +510,9 @@ valid_request(Node, Action, Changes, Ring) ->
{resize, NewRingSize} ->
valid_resize_request(NewRingSize, Changes, Ring);
abort_resize ->
valid_resize_abort_request(Ring)
valid_resize_abort_request(Ring);
{set_location, Location} ->
valid_set_location_request(Location, Node, Ring)
end.

%% @private
Expand Down Expand Up @@ -615,6 +625,20 @@ valid_resize_abort_request(Ring) ->
false -> {error, not_resizing}
end.

%% @private
%% Validating node member status
valid_set_location_request(_Location, Node, Ring) ->
case riak_core_ring:member_status(Ring, Node) of
valid ->
true;
joining ->
true;
invalid ->
{error, not_member};
_ ->
true
end.

%% @private
%% @doc Filter out any staged changes that are no longer valid. Changes
%% can become invalid based on other staged changes, or by cluster
Expand Down Expand Up @@ -1094,7 +1118,9 @@ change({{force_replace, NewNode}, Node}, Ring) ->
change({{resize, NewRingSize}, _Node}, Ring) ->
riak_core_ring:resize(Ring, NewRingSize);
change({abort_resize, _Node}, Ring) ->
riak_core_ring:set_pending_resize_abort(Ring).
riak_core_ring:set_pending_resize_abort(Ring);
change({{set_location, Location}, Node}, Ring) ->
riak_core_ring:set_node_location(Node, Location, Ring).

internal_ring_changed(Node, CState) ->
{Changed, CState5} = do_claimant(Node, CState, fun log/2),
Expand Down
66 changes: 58 additions & 8 deletions src/riak_core_cluster_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ register_all_usage() ->
clique:register_usage(["riak-admin", "cluster", "status"], status_usage()),
clique:register_usage(["riak-admin", "cluster", "partition"], partition_usage()),
clique:register_usage(["riak-admin", "cluster", "partitions"], partitions_usage()),
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()).
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
clique:register_usage(["riak-admin", "cluster", "location"], location_usage()).

register_all_commands() ->
lists:foreach(fun(Args) -> apply(clique, register_command, Args) end,
[status_register(), partition_count_register(),
partitions_register(), partition_register()]).
partitions_register(), partition_register(), location_register()]).

%%%
%% Cluster status
Expand All @@ -72,6 +74,7 @@ cluster_usage() ->
" partition Map partition IDs to indexes\n",
" partitions Display partitions on a node\n",
" partition-count Display ring size or node partition count\n\n",
" location Set node location\n\n",
" Use --help after a sub-command for more details.\n"
].

Expand Down Expand Up @@ -111,12 +114,20 @@ status(_CmdBase, [], []) ->
[T0,T1,Table,T2].

format_status(Node, Status, Ring, RingStatus) ->
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
[{node, is_claimant(Node, Claimant)},
{status, Status},
{avail, node_availability(Node, Down, MarkedDown)},
{ring, claim_percent(Ring, Node)},
{pending, future_claim_percentage(Changes, Ring, Node)}].
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
HasLocationInCluster = riak_core_location:has_location_set_in_cluster(NodesLocations),
format_status(Node, Status, Ring, RingStatus, HasLocationInCluster, NodesLocations).

format_status(Node, Status, Ring, RingStatus, false, _) ->
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
[{node, is_claimant(Node, Claimant)},
{status, Status},
{avail, node_availability(Node, Down, MarkedDown)},
{ring, claim_percent(Ring, Node)},
{pending, future_claim_percentage(Changes, Ring, Node)}];
format_status(Node, Status, Ring, RingStatus, true, NodesLocations) ->
Row = format_status(Node, Status, Ring, RingStatus, false, NodesLocations),
Row ++ [{location, riak_core_location:get_node_location(Node, NodesLocations)}].

is_claimant(Node, Node) ->
" (C) " ++ atom_to_list(Node) ++ " ";
Expand Down Expand Up @@ -263,6 +274,45 @@ id_out1(id, Id, Ring, RingSize) when Id < RingSize ->
id_out1(id, Id, _Ring, _RingSize) ->
make_alert(["ERROR: Id ", integer_to_list(Id), " is invalid."]).


%%%
%% Location
%%%
location_usage() ->
["riak-admin cluster location <new_location> [--node node]\n\n",
" Set the node location parameter\n\n",
"Options\n",
" -n <node>, --node <node>\n",
" Set node location for the specified node.\n"
].

location_register() ->
[["riak-admin", "cluster", "location", '*'], % Cmd
[], % KeySpecs
[{node, [{shortname, "n"}, {longname, "node"},
{typecast, fun clique_typecast:to_node/1}]}], % FlagSpecs
fun stage_set_location/3]. % Implementation callback

stage_set_location([_, _, _, Location], _, Flags) ->
Node = proplists:get_value(node, Flags, node()),
try
case riak_core_claimant:set_node_location(Node, Location) of
ok ->
[clique_status:text(
io_lib:format("Success: staged changing location of node ~p to ~s~n",
[Node, Location]))];
{error, not_member} ->
make_alert(
io_lib:format("Failed: ~p is not a member of the cluster.~n", [Node])
)
end
catch
Exception:Reason ->
lager:error("Setting node location failed ~p:~p", [Exception, Reason]),
make_alert("Setting node location failed, see log for details~n")
end.


%%%
%% Internal
%%%
Expand Down
Loading

0 comments on commit 1f7e204

Please sign in to comment.