diff --git a/src/riak_core_ring.erl b/src/riak_core_ring.erl index eaecd191a..feafec328 100644 --- a/src/riak_core_ring.erl +++ b/src/riak_core_ring.erl @@ -188,15 +188,25 @@ set_chash(State, CHash) -> all_members(#chstate{members = Members}) -> get_members(Members). +%% @doc Produce a list of all nodes in the cluster with the given types +-spec members(State :: chstate(), + Types :: [member_status()]) -> [Node :: term()]. + members(#chstate{members = Members}, Types) -> get_members(Members, Types). %% @doc Produce a list of all active (not marked as down) cluster members +-spec active_members(State :: chstate()) -> [Node :: + term()]. + active_members(#chstate{members = Members}) -> get_members(Members, [joining, valid, leaving, exiting]). %% @doc Returns a list of members guaranteed safe for requests +-spec ready_members(State :: chstate()) -> [Node :: + term()]. + ready_members(#chstate{members = Members}) -> get_members(Members, [valid, leaving]). @@ -381,6 +391,9 @@ random_other_index(State) -> _ -> lists:nth(rand:uniform(length(L)), L) end. +%% @doc Return a partition index not owned by the node executing this function +%% or contained in the exclude list. +%% If there are no feasible index return no_indices. -spec random_other_index(State :: chstate(), Exclude :: [term()]) -> chash:index_as_int() | no_indices. @@ -546,6 +559,12 @@ future_index(CHashKey, OrigIdx, NValCheck, OrigCount, (NextOwner + NextInc * OrigDist) rem RingTop end. +%% @doc Check if the index is either out of bounds of the ring size or the n +%% value +-spec check_invalid_future_index(non_neg_integer(), + pos_integer(), + integer() | undefined) -> boolean(). + check_invalid_future_index(OrigDist, NextCount, NValCheck) -> OverRingSize = OrigDist >= NextCount, @@ -616,6 +635,10 @@ remove_meta(Key, State) -> claimant(#chstate{claimant = Claimant}) -> Claimant. +%% @doc Set the new claimant. +-spec set_claimant(State :: chstate(), + Claimant :: node()) -> NState :: chstate(). + set_claimant(State, Claimant) -> State#chstate{claimant = Claimant}. @@ -625,9 +648,17 @@ set_claimant(State, Claimant) -> cluster_name(State) -> State#chstate.clustername. %% @doc Sets the unique identifer for this cluster. +-spec set_cluster_name(State :: chstate(), + Name :: {term(), term()}) -> chstate(). + set_cluster_name(State, Name) -> State#chstate{clustername = Name}. +%% @doc Mark the cluster names as undefined if at least one is undefined. +%% Else leave the names unchanged. +-spec reconcile_names(RingA :: chstate(), + RingB :: chstate()) -> {chstate(), chstate()}. + reconcile_names(RingA = #chstate{clustername = NameA}, RingB = #chstate{clustername = NameB}) -> case (NameA =:= undefined) or (NameB =:= undefined) of @@ -637,12 +668,24 @@ reconcile_names(RingA = #chstate{clustername = NameA}, false -> {RingA, RingB} end. +%% @doc Increment the vector clock and return the new state. +-spec increment_vclock(Node :: node(), + State :: chstate()) -> chstate(). + increment_vclock(Node, State) -> VClock = vclock:increment(Node, State#chstate.vclock), State#chstate{vclock = VClock}. +%% @doc Return the current ring version. +-spec ring_version(chstate()) -> vclock:vclock() | + undefined. + ring_version(#chstate{rvsn = RVsn}) -> RVsn. +%% @doc Increment the ring version and return the new state. +-spec increment_ring_version(node(), + chstate()) -> chstate(). + increment_ring_version(Node, State) -> RVsn = vclock:increment(Node, State#chstate.rvsn), State#chstate{rvsn = RVsn}. @@ -668,6 +711,11 @@ all_member_status(#chstate{members = Members}) -> || {Node, {Status, _VC, _}} <- Members, Status /= invalid]. +%% @doc return the member's meta value for the given key or undefined if the +%% member or key cannot be found. +-spec get_member_meta(chstate(), node(), + atom()) -> term() | undefined. + get_member_meta(State, Member, Key) -> case orddict:find(Member, State#chstate.members) of error -> undefined; @@ -679,12 +727,19 @@ get_member_meta(State, Member, Key) -> end. %% @doc Set a key in the member metadata orddict +-spec update_member_meta(node(), chstate(), node(), + atom(), term()) -> chstate(). + update_member_meta(Node, State, Member, Key, Val) -> VClock = vclock:increment(Node, State#chstate.vclock), State2 = update_member_meta(Node, State, Member, Key, Val, same_vclock), State2#chstate{vclock = VClock}. +%% @see update_member_meta/5. +-spec update_member_meta(node(), chstate(), node(), + atom(), term(), same_vclock) -> chstate(). + update_member_meta(Node, State, Member, Key, Val, same_vclock) -> Members = State#chstate.members, @@ -700,6 +755,10 @@ update_member_meta(Node, State, Member, Key, Val, false -> State end. +%% @doc Remove the meta entries for the given member. +-spec clear_member_meta(node(), chstate(), + node()) -> chstate(). + clear_member_meta(Node, State, Member) -> Members = State#chstate.members, case orddict:is_key(Member, Members) of @@ -714,22 +773,46 @@ clear_member_meta(Node, State, Member) -> false -> State end. +%% @doc Mark a member as joining +-spec add_member(node(), chstate(), + node()) -> chstate(). + add_member(PNode, State, Node) -> set_member(PNode, State, Node, joining). +%% @doc Mark a member as invalid +-spec remove_member(node(), chstate(), + node()) -> chstate(). + remove_member(PNode, State, Node) -> State2 = clear_member_meta(PNode, State, Node), set_member(PNode, State2, Node, invalid). +%% @doc Mark a member as leaving +-spec leave_member(node(), chstate(), + node()) -> chstate(). + leave_member(PNode, State, Node) -> set_member(PNode, State, Node, leaving). +%% @doc Mark a member as exiting +-spec exit_member(node(), chstate(), + node()) -> chstate(). + exit_member(PNode, State, Node) -> set_member(PNode, State, Node, exiting). +%% @doc Mark a member as down +-spec down_member(node(), chstate(), + node()) -> chstate(). + down_member(PNode, State, Node) -> set_member(PNode, State, Node, down). +%% @doc Mark a member with the given status +-spec set_member(node(), chstate(), node(), + member_status()) -> chstate(). + set_member(Node, CState, Member, Status) -> VClock = vclock:increment(Node, CState#chstate.vclock), CState2 = set_member(Node, CState, Member, Status, @@ -784,6 +867,8 @@ indices(State, Node) -> future_indices(State, Node) -> indices(future_ring(State), Node). +%% @doc Return all node entries that will exist after the pending changes are +%% applied. -spec all_next_owners(chstate()) -> [{integer(), term()}]. @@ -792,6 +877,10 @@ all_next_owners(CState) -> [{Idx, NextOwner} || {Idx, _, NextOwner, _, _} <- Next]. %% @private +%% Change the owner of the indices to the new owners. +-spec change_owners(chstate(), + [{integer(), node()}]) -> chstate(). + change_owners(CState, Reassign) -> lists:foldl(fun ({Idx, NewOwner}, CState0) -> %% if called for indexes not in the current ring (during resizing) @@ -804,6 +893,9 @@ change_owners(CState, Reassign) -> CState, Reassign). %% @doc Return all indices that a node is scheduled to give to another. +-spec disowning_indices(chstate(), + node()) -> [integer()]. + disowning_indices(State, Node) -> case is_resizing(State) of false -> @@ -817,6 +909,10 @@ disowning_indices(State, Node) -> disowned_during_resize(State, Idx, Owner)] end. +%% @doc Check if the owner of the index changes during resize. +-spec disowned_during_resize(chstate(), integer(), + node()) -> boolean(). + disowned_during_resize(CState, Idx, Owner) -> %% catch error when index doesn't exist, we are disowning it if its going away NextOwner = try future_owner(CState, Idx) catch @@ -828,10 +924,18 @@ disowned_during_resize(CState, Idx, Owner) -> end. %% @doc Returns a list of all pending ownership transfers. +-spec pending_changes(chstate()) -> [{integer(), term(), + term(), [module()], awaiting | complete}]. + pending_changes(State) -> %% For now, just return next directly. State#chstate.next. +%% @doc Set the transfers as pending changes +-spec set_pending_changes(chstate(), + [{integer(), term(), term(), [module()], + awaiting | complete}]) -> chstate(). + set_pending_changes(State, Transfers) -> State#chstate{next = Transfers}. @@ -872,6 +976,8 @@ set_pending_resize(Resizing, Orig) -> SortedNext), FutureCHash). +%% @doc Abort the resizing procedure if possible and return true on a succesfull +%% abort. -spec maybe_abort_resize(chstate()) -> {boolean(), chstate()}. @@ -890,11 +996,13 @@ maybe_abort_resize(State) -> false -> {false, State} end. +%% @doc Set the resize abort value to true. -spec set_pending_resize_abort(chstate()) -> chstate(). set_pending_resize_abort(State) -> update_meta('$resized_ring_abort', true, State). +%% @doc Add the transfar from source to target to the scheduled transfers. -spec schedule_resize_transfer(chstate(), {integer(), term()}, integer() | {integer(), term()}) -> chstate(). @@ -934,6 +1042,10 @@ reschedule_resize_transfers(State = #chstate{next = State, Next), NewState#chstate{next = NewNext}. +%% @doc Reset the status of a resize operation +-spec reschedule_resize_operation(pos_integer(), node(), + term(), chstate()) -> {term(), chstate()}. + reschedule_resize_operation(N, NewNode, {Idx, N, '$resize', _Mods, _Status}, State) -> NewEntry = {Idx, NewNode, '$resize', ordsets:new(), @@ -955,6 +1067,12 @@ reschedule_resize_operation(Node, NewNode, false -> {Entry, State} end. +%% @see reschedule_resize_operation/4. +-spec reschedule_inbound_resize_transfers({integer(), + term()}, + node(), node(), + chstate()) -> {boolean(), chstate()}. + reschedule_inbound_resize_transfers(Source, Node, NewNode, State) -> F = fun (Transfer, Acc) ->