From 004205d0372a7099071a0847c75d910f5d2c7bf1 Mon Sep 17 00:00:00 2001 From: Albert Schimpf <38429047+albsch@users.noreply.github.com> Date: Tue, 13 Jul 2021 19:09:55 +0000 Subject: [PATCH] New conventions (#95) * Made spec mandatory for priority queue. Removed unused ifndef. Small formatting. * Made staged_joins default to true * Made proxy routing mandatory * Made forwarding mandatory --- src/riak_core.erl | 2 +- src/riak_core_claimant.erl | 12 +++------ src/riak_core_handoff_sender.erl | 33 ++----------------------- src/riak_core_priority_queue.erl | 42 +++++++++++++++----------------- src/riak_core_vnode.erl | 26 ++++++-------------- src/riak_core_vnode_manager.erl | 12 --------- src/riak_core_vnode_master.erl | 23 +++-------------- src/riak_core_vnode_worker.erl | 38 +++++++++++++++++------------ test/eqc/bucket_eqc_utils.erl | 9 ------- 9 files changed, 59 insertions(+), 138 deletions(-) diff --git a/src/riak_core.erl b/src/riak_core.erl index f6c7870cc..c24e235c3 100644 --- a/src/riak_core.erl +++ b/src/riak_core.erl @@ -56,7 +56,7 @@ stop(Reason) -> %% @doc Join the ring found on the specified remote node %% join(Node) -> - join(Node, true). + join(Node, false). %% @doc Join the remote cluster without automatically claiming ring %% ownership. Used to stage a join in the newer plan/commit diff --git a/src/riak_core_claimant.erl b/src/riak_core_claimant.erl index b46b10834..61c9b6be8 100644 --- a/src/riak_core_claimant.erl +++ b/src/riak_core_claimant.erl @@ -929,15 +929,9 @@ are_joining_nodes(CState) -> %% @private auto_joining_nodes(CState) -> Joining = riak_core_ring:members(CState, [joining]), - case application:get_env(riak_core, staged_joins, false) of - false -> - Joining; - true -> - [Member || Member <- Joining, - riak_core_ring:get_member_meta(CState, - Member, - '$autojoin') == true] - end. +%% case application:get_env(riak_core, staged_joins, true) of false -> Joining; true -> + [Member || Member <- Joining, riak_core_ring:get_member_meta(CState, Member, '$autojoin') == true]. +%% end. %% @private maybe_handle_auto_joining(Node, CState) -> diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl index f51012ea3..ad0daa730 100644 --- a/src/riak_core_handoff_sender.erl +++ b/src/riak_core_handoff_sender.erl @@ -271,7 +271,7 @@ start_fold_(TargetNode, Module, Type, Opts, ParentPid, SrcNode, SrcPartition, Ta exit({shutdown, {error, ErrReason}}) end end. --ifndef('21.0'). + start_fold(TargetNode, Module, {Type, Opts}, ParentPid) -> SrcNode = node(), SrcPartition = get_src_partition(Opts), @@ -300,36 +300,7 @@ start_fold(TargetNode, Module, {Type, Opts}, ParentPid) -> [Err, Reason, Stacktrace]), gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason}) end. --else. -start_fold(TargetNode, Module, {Type, Opts}, ParentPid) -> - SrcNode = node(), - SrcPartition = get_src_partition(Opts), - TargetPartition = get_target_partition(Opts), - try - start_fold_(TargetNode, Module, Type, Opts, ParentPid, SrcNode, SrcPartition, TargetPartition) - catch - exit:{shutdown,max_concurrency} -> - %% Need to fwd the error so the handoff mgr knows - exit({shutdown, max_concurrency}); - exit:{shutdown, timeout} -> - %% A receive timeout during handoff - %% STATS -%% riak_core_stat:update(handoff_timeouts), - ?log_fail("because of TCP recv timeout", []), - exit({shutdown, timeout}); - exit:{shutdown, {error, Reason}} -> - ?log_fail("because of ~p", [Reason]), - gen_fsm_compat:send_event(ParentPid, {handoff_error, - fold_error, Reason}), - exit({shutdown, {error, Reason}}); - throw:{be_quiet, Err, Reason} -> - gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason}); - Err:Reason:Stack -> - ?log_fail("because of ~p:~p ~p", - [Err, Reason, Stack]), - gen_fsm_compat:send_event(ParentPid, {handoff_error, Err, Reason}) - end. --endif. + start_visit_item_timer() -> Ival = case application:get_env(riak_core, handoff_receive_timeout, undefined) of TO when is_integer(TO) -> diff --git a/src/riak_core_priority_queue.erl b/src/riak_core_priority_queue.erl index 29a4495a2..b2cea75e4 100644 --- a/src/riak_core_priority_queue.erl +++ b/src/riak_core_priority_queue.erl @@ -60,57 +60,48 @@ %%---------------------------------------------------------------------------- --ifdef(use_specs). - --type(priority() :: integer()). --type(squeue() :: {queue, [any()], [any()]}). --type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). - --spec(new/0 :: () -> pqueue()). --spec(is_queue/1 :: (any()) -> bool()). --spec(is_empty/1 :: (pqueue()) -> bool()). --spec(len/1 :: (pqueue()) -> non_neg_integer()). --spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). --spec(in/2 :: (any(), pqueue()) -> pqueue()). --spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). --spec(out/1 :: (pqueue()) -> {(empty | {value, any()}), pqueue()}). --spec(out/2 :: (priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}). --spec(pout/1 :: (pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}). --spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). +-type priority() :: integer(). +-type squeue() :: {queue, [any()], [any()]}. +-type pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}. --endif. %%---------------------------------------------------------------------------- -new() -> - {queue, [], []}. +-spec new() -> pqueue(). +new() -> {queue, [], []}. + +-spec is_queue(any()) -> boolean(). is_queue({queue, R, F}) when is_list(R), is_list(F) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> - lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, - Queues); + lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end, Queues); is_queue(_) -> false. +-spec is_empty(pqueue()) -> boolean(). is_empty({queue, [], []}) -> true; is_empty(_) -> false. +-spec len(pqueue()) -> non_neg_integer(). len({queue, R, F}) when is_list(R), is_list(F) -> length(R) + length(F); len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). +-spec to_list(pqueue()) -> [{priority(), any()}]. to_list({queue, In, Out}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)]. +-spec in(any(), pqueue()) -> pqueue(). in(Item, Q) -> in(Item, 0, Q). +-spec in(any(), priority(), pqueue()) -> pqueue(). in(X, 0, {queue, [_] = In, []}) -> {queue, [X], In}; in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> @@ -128,6 +119,7 @@ in(X, Priority, {pqueue, Queues}) -> lists:keysort(1, [{P, {queue, [X], []}} | Queues]) end}. +-spec out(pqueue()) -> {(empty | {value, any()}), pqueue()}. out({queue, [], []} = Q) -> {empty, Q}; out({queue, [V], []}) -> @@ -151,6 +143,7 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +-spec out(priority(), pqueue()) -> {(empty | {value, any()}), pqueue()}. out(_Priority, {queue, [], []} = Q) -> {empty, Q}; out(Priority, {queue, _, _} = Q) when Priority =< 0 -> @@ -162,6 +155,8 @@ out(Priority, {pqueue, [{P, _Q} | _Queues]} = Q) when Priority =< (-P) -> out(_Priority, {pqueue, [_|_]} = Q) -> {empty, Q}. + +-spec pout(pqueue()) -> {(empty | {value, any(), priority()}), pqueue()}. pout({queue, [], []} = Q) -> {empty, Q}; pout({queue, _, _} = Q) -> @@ -179,6 +174,7 @@ pout({pqueue, [{P, Q} | Queues]}) -> end, {{value, V, -P}, NewQ}. +-spec join(pqueue(), pqueue()) -> pqueue(). join(A, {queue, [], []}) -> A; join({queue, [], []}, B) -> @@ -291,7 +287,7 @@ merge_case() -> basic_test() -> simple_case(forward), - simple_case(reverse), +simple_case(reverse), simple_case(mixed), merge_case(), ok. diff --git a/src/riak_core_vnode.erl b/src/riak_core_vnode.erl index 8875f55a7..f1f2a6dd0 100644 --- a/src/riak_core_vnode.erl +++ b/src/riak_core_vnode.erl @@ -346,20 +346,20 @@ forward_or_vnode_command(Sender, Request, State=#state{forward=Forward, false -> undefined end, - Forwardable = is_request_forwardable(Request), - case {Forwardable, Forward, RequestHash} of - %% Not a forwardable command, handle request locally - {false, _, _} -> vnode_command(Sender, Request, State); + case {Forward, RequestHash} of %% typical vnode operation, no forwarding set, handle request locally - {_, undefined, _} -> vnode_command(Sender, Request, State); + {undefined, _} -> vnode_command(Sender, Request, State); + %% implicit forwarding after ownership transfer/hinted handoff - {_, F, _} when not is_list(F) -> + {F, _} when not is_list(F) -> vnode_forward(implicit, {Index, Forward}, Sender, Request, State), continue(State); + %% during resize we can't forward a request w/o request hash, always handle locally - {_, _, undefined} -> vnode_command(Sender, Request, State); + {_, undefined} -> vnode_command(Sender, Request, State); + %% possible forwarding during ring resizing - {_, _, _} -> + {_, _} -> {ok, R} = riak_core_ring_manager:get_my_ring(), FutureIndex = riak_core_ring:future_index(RequestHash, Index, R), vnode_resize_command(Sender, Request, FutureIndex, State) @@ -1062,16 +1062,6 @@ stop_manager_event_timer(#state{manager_event_timer=T}) -> _ = gen_fsm:cancel_timer(T), ok. -is_request_forwardable(#riak_core_fold_req_v2{forwardable=false}) -> - false; -is_request_forwardable(_) -> - %% Assume that all other vnode ops are forwardable. - %% - %% WARNING: The coding style used in this function means that special - %% care must be taken when adding #riak_core_fold_req_v3 and - %% v4 and v27 as well as any other vnode request type. - true. - mod_set_forwarding(_Forward, State=#state{modstate={deleted,_}}) -> State; mod_set_forwarding(Forward, State=#state{mod=Mod, modstate=ModState}) -> diff --git a/src/riak_core_vnode_manager.erl b/src/riak_core_vnode_manager.erl index cf62b2891..295e4a5c2 100644 --- a/src/riak_core_vnode_manager.erl +++ b/src/riak_core_vnode_manager.erl @@ -527,7 +527,6 @@ maybe_ensure_vnodes_started(Ring) -> ok end. --ifndef('21.0'). ensure_vnodes_started(Ring) -> spawn(fun() -> try @@ -537,17 +536,6 @@ ensure_vnodes_started(Ring) -> logger:error("~p", [{Type, Reason, Stacktrace}]) end end). --else. -ensure_vnodes_started(Ring) -> - spawn(fun() -> - try - riak_core_ring_handler:ensure_vnodes_started(Ring) - catch - Type:Reason:Stacktrace -> - logger:error("~p", [{Type, Reason, Stacktrace}]) - end - end). --endif. schedule_management_timer() -> ManagementTick = application:get_env(riak_core, diff --git a/src/riak_core_vnode_master.erl b/src/riak_core_vnode_master.erl index bf31e58f6..352d02886 100644 --- a/src/riak_core_vnode_master.erl +++ b/src/riak_core_vnode_master.erl @@ -119,17 +119,11 @@ coverage(Msg, {Index, Node}, Keyspaces, Sender, VMaster) -> make_coverage_request(Msg, Keyspaces, Sender, Index)). %% Send the command to an individual Index/Node combination, but also -%% return the pid for the vnode handling the request, as `{ok, -%% VnodePid}'. +%% return the pid for the vnode handling the request, as `{ok, VnodePid}'. command_return_vnode({Index,Node}, Msg, Sender, VMaster) -> Req = make_request(Msg, Sender, Index), - case application:get_env(riak_core, vnode_routing) of - {ok, legacy} -> - gen_server:call({VMaster, Node}, {return_vnode, Req}, ?LONG_TIMEOUT); - {ok, proxy} -> - Mod = vmaster_to_vmod(VMaster), - riak_core_vnode_proxy:command_return_vnode({Mod,Index,Node}, Req) - end. + Mod = vmaster_to_vmod(VMaster), + riak_core_vnode_proxy:command_return_vnode({Mod,Index,Node}, Req). %% Send a synchronous command to an individual Index/Node combination. %% Will not return until the vnode has returned @@ -195,16 +189,7 @@ proxy_cast(Who, Req) -> proxy_cast(Who, Req, normal). proxy_cast({VMaster, Node}, Req, How) -> - case application:get_env(riak_core, vnode_routing) of - {ok, legacy} -> - if How == normal -> - gen_server:cast({VMaster, Node}, Req); - How == unreliable -> - riak_core_send_msg:cast_unreliable({VMaster, Node}, Req) - end; - {ok, proxy} -> - do_proxy_cast({VMaster, Node}, Req, How) - end. + do_proxy_cast({VMaster, Node}, Req, How). do_proxy_cast({VMaster, Node}, Req=?VNODE_REQ{index=Idx}, How) -> Mod = vmaster_to_vmod(VMaster), diff --git a/src/riak_core_vnode_worker.erl b/src/riak_core_vnode_worker.erl index ae373c79f..de3cf4221 100644 --- a/src/riak_core_vnode_worker.erl +++ b/src/riak_core_vnode_worker.erl @@ -22,19 +22,20 @@ -include("riak_core_vnode.hrl"). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +% API -export([start_link/1, handle_work/3, handle_work/4]). -type mod_state() :: term(). -record(state, { - module :: atom(), - modstate :: mod_state() - }). + module :: atom(), + modstate :: mod_state() +}). --callback init_worker(partition(), Args :: term(), Props :: [{atom(), term()}]) -> - {ok, mod_state()}. +-callback init_worker(partition(), Args :: term(), Props :: [{atom(), term()}]) -> {ok, mod_state()}. -callback handle_work(Work :: term(), sender(), mod_state()) -> {reply, Reply :: term(), mod_state()} | {noreply, mod_state()}. @@ -44,34 +45,39 @@ start_link(Args) -> [VNodeIndex, WorkerArgs, WorkerProps, Caller] = proplists:get_value(worker_args, Args), gen_server:start_link(?MODULE, [WorkerMod, VNodeIndex, WorkerArgs, WorkerProps, Caller], []). + handle_work(Worker, Work, From) -> handle_work(Worker, Work, From, self()). + handle_work(Worker, Work, From, Caller) -> gen_server:cast(Worker, {work, Work, From, Caller}). + init([Module, VNodeIndex, WorkerArgs, WorkerProps, Caller]) -> {ok, WorkerState} = Module:init_worker(VNodeIndex, WorkerArgs, WorkerProps), %% let the pool queue manager know there might be a worker to checkout riak_core_vnode_worker_pool:worker_started(Caller), - {ok, #state{module=Module, modstate=WorkerState}}. + {ok, #state{module = Module, modstate = WorkerState}}. + handle_call(Event, _From, State) -> logger:debug("Vnode worker received synchronous event: ~p.", [Event]), {reply, ok, State}. + handle_cast({work, Work, WorkFrom, Caller}, - #state{module = Mod, modstate = ModState} = State) -> + #state{module = Mod, modstate = ModState} = State) -> NewModState = case Mod:handle_work(Work, WorkFrom, ModState) of - {reply, Reply, NS} -> - riak_core_vnode:reply(WorkFrom, Reply), - NS; - {noreply, NS} -> - NS - end, + {reply, Reply, NS} -> + riak_core_vnode:reply(WorkFrom, Reply), + NS; + {noreply, NS} -> + NS + end, %% check the worker back into the pool riak_core_vnode_worker_pool:checkin_worker(Caller, self()), - {noreply, State#state{modstate=NewModState}}; + {noreply, State#state{modstate = NewModState}}; handle_cast(_Event, State) -> {noreply, State}. diff --git a/test/eqc/bucket_eqc_utils.erl b/test/eqc/bucket_eqc_utils.erl index c06dce241..3a3d6459c 100644 --- a/test/eqc/bucket_eqc_utils.erl +++ b/test/eqc/bucket_eqc_utils.erl @@ -30,23 +30,14 @@ per_test_setup(DefaultBucketProps, TestFun) -> riak_core_test_util:stop_pid(whereis(riak_core_ring_events)), riak_core_test_util:stop_pid(whereis(riak_core_ring_manager)), application:set_env(riak_core, claimant_tick, 4294967295), - application:set_env(riak_core, broadcast_lazy_timer, 4294967295), - application:set_env(riak_core, broadcast_exchange_timer, 4294967295), - application:set_env(riak_core, metadata_hashtree_timer, 4294967295), application:set_env(riak_core, cluster_name, "eqc_test"), application:set_env(riak_core, default_bucket_props, DefaultBucketProps), {ok, RingEvents} = riak_core_ring_events:start_link(), {ok, RingMgr} = riak_core_ring_manager:start_link(test), {ok, Claimant} = riak_core_claimant:start_link(), - {ok, MetaMgr} = riak_core_metadata_manager:start_link([{data_dir, "./meta_temp"}]), - {ok, Hashtree} = riak_core_metadata_hashtree:start_link("./meta_temp/trees"), - {ok, Broadcast} = riak_core_broadcast:start_link(), Results = TestFun(), - riak_core_test_util:stop_pid(Broadcast), - riak_core_test_util:stop_pid(Hashtree), - riak_core_test_util:stop_pid(MetaMgr), riak_core_test_util:stop_pid(Claimant), unlink(RingMgr), riak_core_ring_manager:stop(),