Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Rack awareness support #967

Merged
merged 6 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions docs/rack-awareness.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Rack Awareness / Availability zones / Location support

The aim is to be able to increase data safety, and make the cluster more resilient
against a location/site/availability zone/rack loss.

To achieve this, a location parameter has been introduced.
It can be set at runtime for each RIAK node.
When claiming a new ring, the list of nodes is ordered taking into consideration the
location of the individual nodes, in a manner that adjacent nodes are preferably
from different locations.

Basically it only changes the order of the nodes fed into the claiming algorithm.

The default location is `undefined`. This means every node with no location parameter set
will be handled as being in the same location.

## Ring visualization

![RIAK Ring Location](ring-location.png)

## Setup node's location parameter

Setting up nodes’ location parameter is a staged operation like
other ring manipulations (join, leave, resize-ring, etc).

### via riak admin
Change current node location parameter:
```bash
riak admin cluster location rack_a
```
or specify a node:
```bash
riak admin cluster location site_b [email protected]
```

#### by erlang function call

```erlang
riak_core_claimant:set_node_location(node(), "location_a"),
```
```erlang
riak_core_claimant:plan(),
riak_core_claimant:comit().
```

## Pitfalls
There are circumstances in which the preferable node location assignment cannot be guaranteed.

If at least one location parameter is set in the cluster when planning a cluster change, a warning
message will be displayed when not all nodes in a preflist are assigned to a different location.

For example, if the default `n_val = 3` is specified and there are only `two distinct locations` set in the cluster,
the message `WARNING: Not all replicas will be on distinct locations` will be shown.

### Not enough distinct locations
When Distinct Location Count is not divisible by Ring size.

### Tail violations
When Ring Size not divisible by Count Of Nodes.
[claim-fixes](claim-fixes.md) cover this, but improper distinct location count could result in undesirable location distribution within the ring.

For example, there are 8 nodes on 3 distinct locations.
To ensure that every site/location has a piece of data, n_val must be at least 4.

It can be checked:

Stages changes:
```erlang
PlannedRing = element(1, lists:last(element(3, riak_core_claimant:plan()))).
riak_core_location:check_ring(PlannedRing, Nval = 4, MinimumNumberOfDistinctLocations = 3).
```

Actual ring:
```erlang
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
riak_core_location:check_ring(Ring, Nval = 4, MinimumNumberOfDistinctLocations = 3).
```

If `riak_core_location:check_ring/3` returns with an empty list `[]`, there is no location violation.

### Won't optimize transfers between old and new ring
When location parameter change triggers ring ownership change, it currently does not optimize transfers.
Binary file added docs/ring-location.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 31 additions & 7 deletions src/riak_core_claim.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ wants_claim_v2(Ring, Node) ->
Count = proplists:get_value(Node, Counts, 0),
case Count < Avg of
false ->
no;
case riak_core_ring:has_location_changed(Ring) of
true ->
{yes, 1};
false ->
no
end;
true ->
{yes, Avg - Count}
end.
Expand Down Expand Up @@ -289,7 +294,8 @@ choose_claim_v2(Ring, Node) ->
Params = default_choose_params(),
choose_claim_v2(Ring, Node, Params).

choose_claim_v2(Ring, Node, Params0) ->
choose_claim_v2(RingOrig, Node, Params0) ->
Ring = riak_core_ring:clear_location_changed(RingOrig),
Params = default_choose_params(Params0),
%% Active::[node()]
Active = riak_core_ring:claiming_members(Ring),
Expand Down Expand Up @@ -326,7 +332,8 @@ choose_claim_v2(Ring, Node, Params0) ->
%% number of indices desired is less than the computed set.
Padding = lists:duplicate(TargetN, undefined),
Expanded = lists:sublist(Active ++ Padding, TargetN),
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, Expanded),
ExpandedLocation = get_nodes_by_location(Expanded, Ring),
PreferredClaim = riak_core_claim:diagonal_stripe(Ring, ExpandedLocation),
PreferredNth = [begin
{Nth, Idx} = lists:keyfind(Idx, 2, AllIndices),
Nth
Expand All @@ -343,8 +350,10 @@ choose_claim_v2(Ring, Node, Params0) ->
Indices2 = prefilter_violations(Ring, Node, AllIndices, Indices,
TargetN, RingSize),
%% Claim indices from the remaining candidate set
Claim = select_indices(Owners, Deltas, Indices2, TargetN, RingSize),
Claim2 = lists:sublist(Claim, Want),
Claim2 = case select_indices(Owners, Deltas, Indices2, TargetN, RingSize) of
[] -> [];
Claim -> lists:sublist(Claim, Want)
end,
NewRing = lists:foldl(fun(Idx, Ring0) ->
riak_core_ring:transfer_node(Idx, Node, Ring0)
end, Ring, Claim2),
Expand Down Expand Up @@ -622,7 +631,8 @@ claim_diagonal(Wants, Owners, Params) ->
riak_core_ring:riak_core_ring().
sequential_claim(Ring0, Node, TargetN) ->
Ring = riak_core_ring:upgrade(Ring0),
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
Nodes = get_nodes_by_location(OrigNodes, Ring),
NodeCount = length(Nodes),
RingSize = riak_core_ring:num_partitions(Ring),

Expand Down Expand Up @@ -709,7 +719,8 @@ backfill_ring(RingSize, Nodes, Remaining, Acc) ->

claim_rebalance_n(Ring0, Node) ->
Ring = riak_core_ring:upgrade(Ring0),
Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
OrigNodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
Nodes = get_nodes_by_location(OrigNodes, Ring),
Zipped = diagonal_stripe(Ring, Nodes),

lists:foldl(fun({P, N}, Acc) ->
Expand Down Expand Up @@ -1270,6 +1281,19 @@ indices_within_n([This | Indices], TN, Last, Q, Acc) ->
circular_distance(I1, I2, Q) ->
min((Q + I1 - I2) rem Q, (Q + I2 - I1) rem Q).

%% @private
%% Get active nodes ordered by take location parameters into account
-spec get_nodes_by_location([node()|undefined], riak_core_ring:riak_core_ring()) ->
[node()|undefined].
get_nodes_by_location(Nodes, Ring) ->
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
case riak_core_location:has_location_set_in_cluster(NodesLocations) of
false ->
Nodes;
true ->
riak_core_location:stripe_nodes_by_location(Nodes, NodesLocations)
end.

%% ===================================================================
%% Unit tests
%% ===================================================================
Expand Down
38 changes: 32 additions & 6 deletions src/riak_core_claimant.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
activate_bucket_type/1,
get_bucket_type/2,
get_bucket_type/3,
bucket_type_iterator/0]).
bucket_type_iterator/0,
set_node_location/2]).
-export([reassign_indices/1]). % helpers for claim sim

%% gen_server callbacks
Expand All @@ -52,7 +53,8 @@
-type action() :: leave
| remove
| {replace, node()}
| {force_replace, node()}.
| {force_replace, node()}
| {set_location, string()}.

-type riak_core_ring() :: riak_core_ring:riak_core_ring().

Expand Down Expand Up @@ -164,6 +166,11 @@ abort_resize() ->
pending_close(Ring, RingID) ->
gen_server:call(?MODULE, {pending_close, Ring, RingID}).

%% @doc Stage a request to set a new location for the given node.
-spec set_node_location(node(), string()) -> ok | {error, atom()}.
set_node_location(Node, Location) ->
stage(Node, {set_location, Location}).

%% @doc Clear the current set of staged transfers
clear() ->
gen_server:call(claimant(), clear, infinity).
Expand Down Expand Up @@ -446,8 +453,9 @@ maybe_commit_staged(Ring, NextRing, #state{next_ring=PlannedRing}) ->
{_, _, false} ->
{ignore, plan_changed};
_ ->
NewRing = riak_core_ring:increment_vclock(Claimant, NextRing),
{new_ring, NewRing}
NewRing0 = riak_core_ring:clear_location_changed(NextRing),
NewRing1 = riak_core_ring:increment_vclock(Claimant, NewRing0),
{new_ring, NewRing1}
end.

%% @private
Expand Down Expand Up @@ -502,7 +510,9 @@ valid_request(Node, Action, Changes, Ring) ->
{resize, NewRingSize} ->
valid_resize_request(NewRingSize, Changes, Ring);
abort_resize ->
valid_resize_abort_request(Ring)
valid_resize_abort_request(Ring);
{set_location, Location} ->
valid_set_location_request(Location, Node, Ring)
end.

%% @private
Expand Down Expand Up @@ -615,6 +625,20 @@ valid_resize_abort_request(Ring) ->
false -> {error, not_resizing}
end.

%% @private
%% Validating node member status
valid_set_location_request(_Location, Node, Ring) ->
case riak_core_ring:member_status(Ring, Node) of
valid ->
true;
joining ->
true;
invalid ->
{error, not_member};
_ ->
true
end.

%% @private
%% @doc Filter out any staged changes that are no longer valid. Changes
%% can become invalid based on other staged changes, or by cluster
Expand Down Expand Up @@ -1094,7 +1118,9 @@ change({{force_replace, NewNode}, Node}, Ring) ->
change({{resize, NewRingSize}, _Node}, Ring) ->
riak_core_ring:resize(Ring, NewRingSize);
change({abort_resize, _Node}, Ring) ->
riak_core_ring:set_pending_resize_abort(Ring).
riak_core_ring:set_pending_resize_abort(Ring);
change({{set_location, Location}, Node}, Ring) ->
riak_core_ring:set_node_location(Node, Location, Ring).

internal_ring_changed(Node, CState) ->
{Changed, CState5} = do_claimant(Node, CState, fun log/2),
Expand Down
66 changes: 58 additions & 8 deletions src/riak_core_cluster_cli.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ register_all_usage() ->
clique:register_usage(["riak-admin", "cluster", "status"], status_usage()),
clique:register_usage(["riak-admin", "cluster", "partition"], partition_usage()),
clique:register_usage(["riak-admin", "cluster", "partitions"], partitions_usage()),
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()).
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
clique:register_usage(["riak-admin", "cluster", "partition_count"], partition_count_usage()),
clique:register_usage(["riak-admin", "cluster", "location"], location_usage()).

register_all_commands() ->
lists:foreach(fun(Args) -> apply(clique, register_command, Args) end,
[status_register(), partition_count_register(),
partitions_register(), partition_register()]).
partitions_register(), partition_register(), location_register()]).

%%%
%% Cluster status
Expand All @@ -72,6 +74,7 @@ cluster_usage() ->
" partition Map partition IDs to indexes\n",
" partitions Display partitions on a node\n",
" partition-count Display ring size or node partition count\n\n",
" location Set node location\n\n",
" Use --help after a sub-command for more details.\n"
].

Expand Down Expand Up @@ -111,12 +114,20 @@ status(_CmdBase, [], []) ->
[T0,T1,Table,T2].

format_status(Node, Status, Ring, RingStatus) ->
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
[{node, is_claimant(Node, Claimant)},
{status, Status},
{avail, node_availability(Node, Down, MarkedDown)},
{ring, claim_percent(Ring, Node)},
{pending, future_claim_percentage(Changes, Ring, Node)}].
NodesLocations = riak_core_ring:get_nodes_locations(Ring),
HasLocationInCluster = riak_core_location:has_location_set_in_cluster(NodesLocations),
format_status(Node, Status, Ring, RingStatus, HasLocationInCluster, NodesLocations).

format_status(Node, Status, Ring, RingStatus, false, _) ->
{Claimant, _RingReady, Down, MarkedDown, Changes} = RingStatus,
[{node, is_claimant(Node, Claimant)},
{status, Status},
{avail, node_availability(Node, Down, MarkedDown)},
{ring, claim_percent(Ring, Node)},
{pending, future_claim_percentage(Changes, Ring, Node)}];
format_status(Node, Status, Ring, RingStatus, true, NodesLocations) ->
Row = format_status(Node, Status, Ring, RingStatus, false, NodesLocations),
Row ++ [{location, riak_core_location:get_node_location(Node, NodesLocations)}].

is_claimant(Node, Node) ->
" (C) " ++ atom_to_list(Node) ++ " ";
Expand Down Expand Up @@ -263,6 +274,45 @@ id_out1(id, Id, Ring, RingSize) when Id < RingSize ->
id_out1(id, Id, _Ring, _RingSize) ->
make_alert(["ERROR: Id ", integer_to_list(Id), " is invalid."]).


%%%
%% Location
%%%
location_usage() ->
["riak-admin cluster location <new_location> [--node node]\n\n",
" Set the node location parameter\n\n",
"Options\n",
" -n <node>, --node <node>\n",
" Set node location for the specified node.\n"
].

location_register() ->
[["riak-admin", "cluster", "location", '*'], % Cmd
[], % KeySpecs
[{node, [{shortname, "n"}, {longname, "node"},
{typecast, fun clique_typecast:to_node/1}]}], % FlagSpecs
fun stage_set_location/3]. % Implementation callback

stage_set_location([_, _, _, Location], _, Flags) ->
Node = proplists:get_value(node, Flags, node()),
try
case riak_core_claimant:set_node_location(Node, Location) of
ok ->
[clique_status:text(
io_lib:format("Success: staged changing location of node ~p to ~s~n",
[Node, Location]))];
{error, not_member} ->
make_alert(
io_lib:format("Failed: ~p is not a member of the cluster.~n", [Node])
)
end
catch
Exception:Reason ->
lager:error("Setting node location failed ~p:~p", [Exception, Reason]),
make_alert("Setting node location failed, see log for details~n")
end.


%%%
%% Internal
%%%
Expand Down
Loading