Skip to content

Commit

Permalink
Requests keys and hashes off loop (#818)
Browse files Browse the repository at this point in the history
See #817

Look to resolve deadlock by handling request in sender loop rather than blocking riak_repl_aae_sink waiting for this result.
  • Loading branch information
martinsumner committed Dec 19, 2022
1 parent 3c932a8 commit bf74f36
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 24 deletions.
18 changes: 14 additions & 4 deletions src/riak_repl_aae_sink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ process_msg(?MSG_GET_AAE_BUCKET, {Level,BucketNum,IndexN}, State=#state{tree_pid
ResponseMsg = riak_kv_index_hashtree:exchange_bucket(IndexN, Level, BucketNum, TreePid),
send_reply(ResponseMsg, State);

process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=TreePid}) ->
ResponseMsg = riak_kv_index_hashtree:exchange_segment(IndexN, SegmentNum, TreePid),
send_reply(ResponseMsg, State);
process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State) ->
riak_repl_stats:aae_segments_requested(),
State#state.sender !
{?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, State#state.tree_pid}},
{noreply, State};

%% no reply
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
Expand Down Expand Up @@ -209,7 +211,15 @@ sender_init(Transport, Socket) ->
sender_loop({Transport, Socket}).

sender_loop(State={Transport, Socket}) ->
receive Msg ->
receive
{?MSG_GET_AAE_SEGMENT, {SegmentNum, IndexN, TreePid}} ->
KeysHashes =
riak_kv_index_hashtree:exchange_segment(
IndexN, SegmentNum, TreePid),
riak_repl_stats:keys_hashes_returned(length(KeysHashes)),
DataBin = term_to_binary(KeysHashes),
ok = Transport:send(Socket, <<?MSG_REPLY:8, DataBin/binary>>);
Msg ->
ok = Transport:send(Socket, Msg)
end,
?MODULE:sender_loop(State).
42 changes: 26 additions & 16 deletions src/riak_repl_aae_source.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ cancel_fullsync(Pid) ->
%%%===================================================================

init([Cluster, Client, Transport, Socket, Partition, OwnerPid, Proto]) ->
?LOG_DEBUG("AAE fullsync source worker started for partition ~p",
[Partition]),
?LOG_INFO(
"AAE fullsync source worker started for partition ~p", [Partition]),

Ver = riak_repl_util:deduce_wire_version_from_proto(Proto),
{_, ClientVer, _} = Proto,
Expand Down Expand Up @@ -287,7 +287,9 @@ update_trees(tree_built, State = #state{indexns=IndexNs}) ->
NeededBuilts ->
%% Trees built now we can estimate how many keys
{ok, EstimatedNrKeys} = riak_kv_index_hashtree:estimate_keys(State#state.tree_pid),
?LOG_DEBUG("EstimatedNrKeys ~p for partition ~p", [EstimatedNrKeys, State#state.index]),
?LOG_INFO(
"EstimatedNrKeys ~p for partition ~p",
[EstimatedNrKeys, State#state.index]),

?LOG_DEBUG("Moving to key exchange state"),
key_exchange(init, State#state{built=Built, estimated_nr_keys = EstimatedNrKeys});
Expand Down Expand Up @@ -317,13 +319,14 @@ key_exchange(init, State) ->
State2 = State#state{exchange=Exchange},
key_exchange(start_key_exchange, State2);
key_exchange(cancel_fullsync, State) ->
?LOG_INFO("AAE fullsync source cancelled for partition ~p", [State#state.index]),
?LOG_INFO(
"AAE fullsync source cancelled for partition ~p", [State#state.index]),
send_complete(State),
{stop, normal, State};
key_exchange(finish_fullsync, State=#state{owner=Owner}) ->
send_complete(State),
?LOG_DEBUG("AAE fullsync source completed partition ~p",
[State#state.index]),
?LOG_INFO(
"AAE fullsync source completed partition ~p", [State#state.index]),
riak_repl2_fssource:fullsync_complete(Owner),
%% TODO: Why stay in key_exchange? Should we stop instead?
{next_state, key_exchange, State};
Expand All @@ -343,8 +346,9 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster,
tree_pid=TreePid,
exchange=Exchange,
indexns=[IndexN|_IndexNs]}) ->
?LOG_DEBUG("Starting fullsync key exchange with ~p for ~p/~p",
[Cluster, Partition, IndexN]),
?LOG_INFO(
"Starting fullsync key exchange with ~p for ~p/~p",
[Cluster, Partition, IndexN]),

SourcePid = self(),

Expand Down Expand Up @@ -398,14 +402,20 @@ key_exchange(start_key_exchange, State=#state{cluster=Cluster,
end,

%% TODO: Add stats for AAE
?LOG_DEBUG("Starting compare for partition ~p", [Partition]),
spawn_link(fun() ->
StageStart=os:timestamp(),
Exchange2 = riak_kv_index_hashtree:compare(IndexN, Remote, AccFun, Exchange, TreePid),
?LOG_DEBUG("Full-sync with site ~p; fullsync difference generator for ~p complete (completed in ~p secs)",
[State#state.cluster, Partition, riak_repl_util:elapsed_secs(StageStart)]),
gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2})
end),
?LOG_INFO("Starting compare for partition ~p", [Partition]),
spawn_link(
fun() ->
StageStart = os:timestamp(),
Exchange2 =
riak_kv_index_hashtree:compare(
IndexN, Remote, AccFun, Exchange, TreePid),
?LOG_INFO(
"Full-sync with site ~p; fullsync difference generator for ~p completion_time=~p secs",
[State#state.cluster,
Partition,
riak_repl_util:elapsed_secs(StageStart)]),
gen_fsm:send_event(SourcePid, {'$aae_src', done, Exchange2})
end),

%% wait for differences from bloom_folder or to be done
{next_state, compute_differences, State}.
Expand Down
31 changes: 27 additions & 4 deletions src/riak_repl_stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
clear_rt_dirty/0,
touch_rt_dirty_file/0,
remove_rt_dirty_file/0,
is_rt_dirty/0]).
is_rt_dirty/0,
aae_segments_requested/0,
keys_hashes_returned/1]).

-define(APP, riak_repl).

Expand Down Expand Up @@ -109,6 +111,12 @@ elections_elected() ->
elections_leader_changed() ->
increment_counter(elections_leader_changed).

aae_segments_requested() ->
increment_counter(aae_segments_requested).

keys_hashes_returned(Length) ->
increment_counter(keys_hashes_returned, Length).

%% If any source errors are detected, write a file out to persist this status
%% across restarts
rt_source_errors() ->
Expand Down Expand Up @@ -214,7 +222,9 @@ stats() ->
{last_server_bytes_recv, gauge},
{rt_source_errors, counter},
{rt_sink_errors, counter},
{rt_dirty, counter}].
{rt_dirty, counter},
{aae_segments_requested, counter},
{keys_hashes_returned, counter}].

increment_counter(Name) ->
increment_counter(Name, 1).
Expand Down Expand Up @@ -429,11 +439,24 @@ test_check_stats() ->
{server_tx_kbps,[]},
{rt_source_errors,1},
{rt_sink_errors, 1},
{rt_dirty, 2}],
{rt_dirty, 2},
{aae_segments_requested, 0},
{keys_hashes_returned, 0}],
Result = get_stats(),

?assertEqual(Expected,
[{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]).
[{K1, V} || {K1, V} <- Result, {K2, _} <- Expected, K1 == K2]),

riak_repl_stats:aae_segments_requested(),
riak_repl_stats:keys_hashes_returned(100),

UpdResult = get_stats(),
?assertMatch(
{aae_segments_requested, 1},
lists:keyfind(aae_segments_requested, 1, UpdResult)),
?assertMatch(
{keys_hashes_returned, 100},
lists:keyfind(keys_hashes_returned, 1, UpdResult)).


test_report() ->
Expand Down

0 comments on commit bf74f36

Please sign in to comment.