Skip to content

Commit

Permalink
New conventions (#95)
Browse files Browse the repository at this point in the history
* Made spec mandatory for priority queue. Removed unused ifndef. Small formatting.

* Made staged_joins default to true

* Made proxy routing mandatory

* Made forwarding mandatory
  • Loading branch information
albsch authored Jul 13, 2021
1 parent 7318d53 commit 004205d
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/riak_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ stop(Reason) ->
%% @doc Join the ring found on the specified remote node
%%
join(Node) ->
join(Node, true).
join(Node, false).

%% @doc Join the remote cluster without automatically claiming ring
%% ownership. Used to stage a join in the newer plan/commit
Expand Down
12 changes: 3 additions & 9 deletions src/riak_core_claimant.erl
Original file line number Diff line number Diff line change
Expand Up @@ -929,15 +929,9 @@ are_joining_nodes(CState) ->
%% @private
auto_joining_nodes(CState) ->
Joining = riak_core_ring:members(CState, [joining]),
case application:get_env(riak_core, staged_joins, false) of
false ->
Joining;
true ->
[Member || Member <- Joining,
riak_core_ring:get_member_meta(CState,
Member,
'$autojoin') == true]
end.
%% case application:get_env(riak_core, staged_joins, true) of false -> Joining; true ->
[Member || Member <- Joining, riak_core_ring:get_member_meta(CState, Member, '$autojoin') == true].
%% end.

%% @private
maybe_handle_auto_joining(Node, CState) ->
Expand Down
33 changes: 2 additions & 31 deletions src/riak_core_handoff_sender.erl
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ start_fold_(TargetNode, Module, Type, Opts, ParentPid, SrcNode, SrcPartition, Ta
exit({shutdown, {error, ErrReason}})
end
end.
-ifndef('21.0').

start_fold(TargetNode, Module, {Type, Opts}, ParentPid) ->
SrcNode = node(),
SrcPartition = get_src_partition(Opts),
Expand Down Expand Up @@ -300,36 +300,7 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid) ->
[Err, Reason, Stacktrace]),
gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason})
end.
-else.
start_fold(TargetNode, Module, {Type, Opts}, ParentPid) ->
SrcNode = node(),
SrcPartition = get_src_partition(Opts),
TargetPartition = get_target_partition(Opts),
try
start_fold_(TargetNode, Module, Type, Opts, ParentPid, SrcNode, SrcPartition, TargetPartition)
catch
exit:{shutdown,max_concurrency} ->
%% Need to fwd the error so the handoff mgr knows
exit({shutdown, max_concurrency});
exit:{shutdown, timeout} ->
%% A receive timeout during handoff
%% STATS
%% riak_core_stat:update(handoff_timeouts),
?log_fail("because of TCP recv timeout", []),
exit({shutdown, timeout});
exit:{shutdown, {error, Reason}} ->
?log_fail("because of ~p", [Reason]),
gen_fsm_compat:send_event(ParentPid, {handoff_error,
fold_error, Reason}),
exit({shutdown, {error, Reason}});
throw:{be_quiet, Err, Reason} ->
gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason});
Err:Reason:Stack ->
?log_fail("because of ~p:~p ~p",
[Err, Reason, Stack]),
gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason})
end.
-endif.

start_visit_item_timer() ->
Ival = case application:get_env(riak_core, handoff_receive_timeout, undefined) of
TO when is_integer(TO) ->
Expand Down
42 changes: 19 additions & 23 deletions src/riak_core_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,57 +60,48 @@

%%----------------------------------------------------------------------------

-ifdef(use_specs).

-type(priority() :: integer()).
-type(squeue() :: {queue, [any()], [any()]}).
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).

-spec(new/0 :: () -> pqueue()).
-spec(is_queue/1 :: (any()) -> bool()).
-spec(is_empty/1 :: (pqueue()) -> bool()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}).
-spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}).
-spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-type priority() :: integer().
-type squeue() :: {queue, [any()], [any()]}.
-type pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}.

-endif.

%%----------------------------------------------------------------------------

new() ->
{queue, [], []}.
-spec new() -> pqueue().
new() -> {queue, [], []}.


-spec is_queue(any()) -> boolean().
is_queue({queue, R, F}) when is_list(R), is_list(F) ->
true;
is_queue({pqueue, Queues}) when is_list(Queues) ->
lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
Queues);
lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, Queues);
is_queue(_) ->
false.

-spec is_empty(pqueue()) -> boolean().
is_empty({queue, [], []}) ->
true;
is_empty(_) ->
false.

-spec len(pqueue()) -> non_neg_integer().
len({queue, R, F}) when is_list(R), is_list(F) ->
length(R) + length(F);
len({pqueue, Queues}) ->
lists:sum([len(Q) || {_, Q} <- Queues]).

-spec to_list(pqueue()) -> [{priority(), any()}].
to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
[{0, V} || V <- Out ++ lists:reverse(In, [])];
to_list({pqueue, Queues}) ->
[{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].

-spec in(any(), pqueue()) -> pqueue().
in(Item, Q) ->
in(Item, 0, Q).

-spec in(any(), priority(), pqueue()) -> pqueue().
in(X, 0, {queue, [_] = In, []}) ->
{queue, [X], In};
in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
Expand All @@ -128,6 +119,7 @@ in(X, Priority, {pqueue, Queues}) ->
lists:keysort(1, [{P, {queue, [X], []}} | Queues])
end}.

-spec out(pqueue()) -> {(empty | {value, any()}), pqueue()}.
out({queue, [], []} = Q) ->
{empty, Q};
out({queue, [V], []}) ->
Expand All @@ -151,6 +143,7 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.

-spec out(priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}.
out(_Priority, {queue, [], []} = Q) ->
{empty, Q};
out(Priority, {queue, _, _} = Q) when Priority =< 0 ->
Expand All @@ -162,6 +155,8 @@ out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) ->
out(_Priority, {pqueue, [_|_]} = Q) ->
{empty, Q}.


-spec pout(pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}.
pout({queue, [], []} = Q) ->
{empty, Q};
pout({queue, _, _} = Q) ->
Expand All @@ -179,6 +174,7 @@ pout({pqueue, [{P, Q} | Queues]}) ->
end,
{{value, V, -P}, NewQ}.

-spec join(pqueue(), pqueue()) -> pqueue().
join(A, {queue, [], []}) ->
A;
join({queue, [], []}, B) ->
Expand Down Expand Up @@ -291,7 +287,7 @@ merge_case() ->

basic_test() ->
simple_case(forward),
simple_case(reverse),
simple_case(reverse),
simple_case(mixed),
merge_case(),
ok.
Expand Down
26 changes: 8 additions & 18 deletions src/riak_core_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -346,20 +346,20 @@ forward_or_vnode_command(Sender, Request, State=#state{forward=Forward,
false ->
undefined
end,
Forwardable = is_request_forwardable(Request),
case {Forwardable, Forward, RequestHash} of
%% Not a forwardable command, handle request locally
{false, _, _} -> vnode_command(Sender, Request, State);
case {Forward, RequestHash} of
%% typical vnode operation, no forwarding set, handle request locally
{_, undefined, _} -> vnode_command(Sender, Request, State);
{undefined, _} -> vnode_command(Sender, Request, State);

%% implicit forwarding after ownership transfer/hinted handoff
{_, F, _} when not is_list(F) ->
{F, _} when not is_list(F) ->
vnode_forward(implicit, {Index, Forward}, Sender, Request, State),
continue(State);

%% during resize we can't forward a request w/o request hash, always handle locally
{_, _, undefined} -> vnode_command(Sender, Request, State);
{_, undefined} -> vnode_command(Sender, Request, State);

%% possible forwarding during ring resizing
{_, _, _} ->
{_, _} ->
{ok, R} = riak_core_ring_manager:get_my_ring(),
FutureIndex = riak_core_ring:future_index(RequestHash, Index, R),
vnode_resize_command(Sender, Request, FutureIndex, State)
Expand Down Expand Up @@ -1062,16 +1062,6 @@ stop_manager_event_timer(#state{manager_event_timer=T}) ->
_ = gen_fsm:cancel_timer(T),
ok.

is_request_forwardable(#riak_core_fold_req_v2{forwardable=false}) ->
false;
is_request_forwardable(_) ->
%% Assume that all other vnode ops are forwardable.
%%
%% WARNING: The coding style used in this function means that special
%% care must be taken when adding #riak_core_fold_req_v3 and
%% v4 and v27 as well as any other vnode request type.
true.

mod_set_forwarding(_Forward, State=#state{modstate={deleted,_}}) ->
State;
mod_set_forwarding(Forward, State=#state{mod=Mod, modstate=ModState}) ->
Expand Down
12 changes: 0 additions & 12 deletions src/riak_core_vnode_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,6 @@ maybe_ensure_vnodes_started(Ring) ->
ok
end.

-ifndef('21.0').
ensure_vnodes_started(Ring) ->
spawn(fun() ->
try
Expand All @@ -537,17 +536,6 @@ ensure_vnodes_started(Ring) ->
logger:error("~p", [{Type, Reason, Stacktrace}])
end
end).
-else.
ensure_vnodes_started(Ring) ->
spawn(fun() ->
try
riak_core_ring_handler:ensure_vnodes_started(Ring)
catch
Type:Reason:Stacktrace ->
logger:error("~p", [{Type, Reason, Stacktrace}])
end
end).
-endif.

schedule_management_timer() ->
ManagementTick = application:get_env(riak_core,
Expand Down
23 changes: 4 additions & 19 deletions src/riak_core_vnode_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,11 @@ coverage(Msg, {Index, Node}, Keyspaces, Sender, VMaster) ->
make_coverage_request(Msg, Keyspaces, Sender, Index)).

%% Send the command to an individual Index/Node combination, but also
%% return the pid for the vnode handling the request, as `{ok,
%% VnodePid}'.
%% return the pid for the vnode handling the request, as `{ok, VnodePid}'.
command_return_vnode({Index,Node}, Msg, Sender, VMaster) ->
Req = make_request(Msg, Sender, Index),
case application:get_env(riak_core, vnode_routing) of
{ok, legacy} ->
gen_server:call({VMaster, Node}, {return_vnode, Req}, ?LONG_TIMEOUT);
{ok, proxy} ->
Mod = vmaster_to_vmod(VMaster),
riak_core_vnode_proxy:command_return_vnode({Mod,Index,Node}, Req)
end.
Mod = vmaster_to_vmod(VMaster),
riak_core_vnode_proxy:command_return_vnode({Mod,Index,Node}, Req).

%% Send a synchronous command to an individual Index/Node combination.
%% Will not return until the vnode has returned
Expand Down Expand Up @@ -195,16 +189,7 @@ proxy_cast(Who, Req) ->
proxy_cast(Who, Req, normal).

proxy_cast({VMaster, Node}, Req, How) ->
case application:get_env(riak_core, vnode_routing) of
{ok, legacy} ->
if How == normal ->
gen_server:cast({VMaster, Node}, Req);
How == unreliable ->
riak_core_send_msg:cast_unreliable({VMaster, Node}, Req)
end;
{ok, proxy} ->
do_proxy_cast({VMaster, Node}, Req, How)
end.
do_proxy_cast({VMaster, Node}, Req, How).

do_proxy_cast({VMaster, Node}, Req=?VNODE_REQ{index=Idx}, How) ->
Mod = vmaster_to_vmod(VMaster),
Expand Down
38 changes: 22 additions & 16 deletions src/riak_core_vnode_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@

-include("riak_core_vnode.hrl").

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

% API
-export([start_link/1, handle_work/3, handle_work/4]).

-type mod_state() :: term().

-record(state, {
module :: atom(),
modstate :: mod_state()
}).
module :: atom(),
modstate :: mod_state()
}).

-callback init_worker(partition(), Args :: term(), Props :: [{atom(), term()}]) ->
{ok, mod_state()}.
-callback init_worker(partition(), Args :: term(), Props :: [{atom(), term()}]) -> {ok, mod_state()}.
-callback handle_work(Work :: term(), sender(), mod_state()) ->
{reply, Reply :: term(), mod_state()} |
{noreply, mod_state()}.
Expand All @@ -44,34 +45,39 @@ start_link(Args) ->
[VNodeIndex, WorkerArgs, WorkerProps, Caller] = proplists:get_value(worker_args, Args),
gen_server:start_link(?MODULE, [WorkerMod, VNodeIndex, WorkerArgs, WorkerProps, Caller], []).


handle_work(Worker, Work, From) ->
handle_work(Worker, Work, From, self()).


handle_work(Worker, Work, From, Caller) ->
gen_server:cast(Worker, {work, Work, From, Caller}).


init([Module, VNodeIndex, WorkerArgs, WorkerProps, Caller]) ->
{ok, WorkerState} = Module:init_worker(VNodeIndex, WorkerArgs, WorkerProps),
%% let the pool queue manager know there might be a worker to checkout
riak_core_vnode_worker_pool:worker_started(Caller),
{ok, #state{module=Module, modstate=WorkerState}}.
{ok, #state{module = Module, modstate = WorkerState}}.


handle_call(Event, _From, State) ->
logger:debug("Vnode worker received synchronous event: ~p.", [Event]),
{reply, ok, State}.


handle_cast({work, Work, WorkFrom, Caller},
#state{module = Mod, modstate = ModState} = State) ->
#state{module = Mod, modstate = ModState} = State) ->
NewModState = case Mod:handle_work(Work, WorkFrom, ModState) of
{reply, Reply, NS} ->
riak_core_vnode:reply(WorkFrom, Reply),
NS;
{noreply, NS} ->
NS
end,
{reply, Reply, NS} ->
riak_core_vnode:reply(WorkFrom, Reply),
NS;
{noreply, NS} ->
NS
end,
%% check the worker back into the pool
riak_core_vnode_worker_pool:checkin_worker(Caller, self()),
{noreply, State#state{modstate=NewModState}};
{noreply, State#state{modstate = NewModState}};

handle_cast(_Event, State) -> {noreply, State}.

Expand Down
9 changes: 0 additions & 9 deletions test/eqc/bucket_eqc_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,14 @@ per_test_setup(DefaultBucketProps, TestFun) ->
riak_core_test_util:stop_pid(whereis(riak_core_ring_events)),
riak_core_test_util:stop_pid(whereis(riak_core_ring_manager)),
application:set_env(riak_core, claimant_tick, 4294967295),
application:set_env(riak_core, broadcast_lazy_timer, 4294967295),
application:set_env(riak_core, broadcast_exchange_timer, 4294967295),
application:set_env(riak_core, metadata_hashtree_timer, 4294967295),
application:set_env(riak_core, cluster_name, "eqc_test"),
application:set_env(riak_core, default_bucket_props, DefaultBucketProps),
{ok, RingEvents} = riak_core_ring_events:start_link(),
{ok, RingMgr} = riak_core_ring_manager:start_link(test),
{ok, Claimant} = riak_core_claimant:start_link(),
{ok, MetaMgr} = riak_core_metadata_manager:start_link([{data_dir, "./meta_temp"}]),
{ok, Hashtree} = riak_core_metadata_hashtree:start_link("./meta_temp/trees"),
{ok, Broadcast} = riak_core_broadcast:start_link(),

Results = TestFun(),

riak_core_test_util:stop_pid(Broadcast),
riak_core_test_util:stop_pid(Hashtree),
riak_core_test_util:stop_pid(MetaMgr),
riak_core_test_util:stop_pid(Claimant),
unlink(RingMgr),
riak_core_ring_manager:stop(),
Expand Down

0 comments on commit 004205d

Please sign in to comment.