Skip to content

Commit

Permalink
Send only a rumor head on tick
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Tihanyi committed Nov 28, 2023
1 parent 1cdf7fe commit 2f396cd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 20 deletions.
7 changes: 6 additions & 1 deletion src/simple_gossip.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@
data :: any(),
leader :: node(),
nodes = [] :: [node()],
max_gossip_per_period = 3 :: pos_integer(),
max_gossip_per_period = 1 :: pos_integer(),
gossip_period = 15000 :: pos_integer()
}).

-record(rumor_head, {gossip_version = 1 :: pos_integer(),
vector_clock = simple_gossip_vclock:vclock()
}).

-type rumor() :: #rumor{}.
-type rumor_head() :: #rumor_head{}.
-export_type([rumor/0]).
43 changes: 27 additions & 16 deletions src/simple_gossip_rumor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@

%% API
-export([new/0,
new/1,
add_node/2,
change_leader/1,
remove_node/2,
nodes/1,
pick_random_nodes/2,
check_node_exclude/1,
if_member/3,
if_not_member/3,
set_data/2,
data/1,
check_vector_clocks/2,
change_max_gossip_per_period/2,
change_gossip_interval/2,
calculate_new_leader/1,
calculate_new_leader/2]).
new/1,
head/1,
add_node/2,
change_leader/1,
remove_node/2,
nodes/1,
pick_random_nodes/2,
check_node_exclude/1,
if_member/3,
if_not_member/3,
set_data/2,
data/1,
check_vector_clocks/2,
change_max_gossip_per_period/2,
change_gossip_interval/2,
calculate_new_leader/1,
calculate_new_leader/2, version/1]).

-type manage_node_fun() :: fun(() -> rumor()).
-type if_leader_node_fun() :: fun((node()) -> rumor()).
Expand All @@ -50,6 +51,10 @@ new(#rumor{vector_clock = VectorClocks}) ->
gossip_version = 1,
vector_clock = VectorClocks}.

-spec head(rumor()) -> rumor_head().
head(#rumor{gossip_version = GossipVersion, vector_clock = VectorClock}) ->
#rumor_head{gossip_version = GossipVersion, vector_clock = VectorClock}.

-spec change_max_gossip_per_period(rumor(), pos_integer()) -> rumor().
change_max_gossip_per_period(#rumor{} = Rumor, Period) ->
increase_version(Rumor#rumor{max_gossip_per_period = Period}).
Expand Down Expand Up @@ -90,6 +95,12 @@ set_data(#rumor{} = Rumor, Data) ->
data(#rumor{data = Data}) ->
Data.

-spec version(rumor_head() | rumor()) -> pos_integer().
version(#rumor{gossip_version = Version}) ->
Version;
version(#rumor_head{gossip_version = Version}) ->
Version.

-spec remove_node(rumor(), node()) -> rumor().
remove_node(#rumor{nodes = Nodes} = Rumor, Node) ->
if_member(Rumor, Node,
Expand Down
26 changes: 23 additions & 3 deletions src/simple_gossip_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ handle_call({join_request, #rumor{gossip_version = InGossipVsn}}, {FromPid, _},
" Asking remote to upgrade", [Node, CurrentGossipVsn, InGossipVsn]),
simple_gossip_event:notify(NewRumor),
{reply, {upgrade, NewRumor}, reschedule_gossip(State#state{rumor = NewRumor})};
handle_call(reconcile, {FromPid, _FromRef}, #state{rumor = Rumor} = State) ->
gossip(Rumor, node(FromPid)),
{reply, ok, State};

handle_call(Command, From, State) ->
proxy_call(Command, From, State).
Expand Down Expand Up @@ -322,6 +325,11 @@ handle_command(whisper_your_gossip, _From,
{noreply, NewState :: state()} |
{noreply, NewState :: state(), timeout() | hibernate} |
{stop, Reason :: term(), NewState :: state()}).
handle_cast({reconcile, #rumor_head{gossip_version = InVersion}, SenderNode},
#state{rumor = #rumor{gossip_version = Version}} = State)
when InVersion > Version ->
gen_server:call({?SERVER, SenderNode}, reconcile),
{noreply, State};
handle_cast({reconcile,
#rumor{gossip_version = InVersion} = InRumor,
SenderNode},
Expand All @@ -341,9 +349,14 @@ handle_cast({reconcile,
?LOG_WARNING("Dropped newer gossip due to vclock check", []),
{noreply, State}
end;
handle_cast({reconcile, Rumor, SenderNode}, State) ->
handle_cast({reconcile, #rumor_head{gossip_version = InVersion}, _SenderNode},
#state{rumor = #rumor{gossip_version = Version}} = State)
when InVersion =:= Version ->
{noreply, State};
handle_cast({reconcile, InRumor, SenderNode}, #state{rumor = Rumor} = State) ->
?LOG_DEBUG("Gossip ~p dropped due to lower version, From ~p",
[Rumor#rumor.gossip_version, SenderNode]),
[simple_gossip_rumor:version(InRumor), SenderNode]),
gossip(Rumor, SenderNode),
{noreply, State};

handle_cast({get_gossip_version, Requester, Ref},
Expand Down Expand Up @@ -403,7 +416,10 @@ gossip_random_node(#rumor{nodes = []}) ->
ok;
gossip_random_node(Rumor) ->
NewNodes = remove_current_node(simple_gossip_rumor:nodes(Rumor)),
gossip(Rumor, simple_gossip_rumor:pick_random_nodes(NewNodes, 1)).
case simple_gossip_rumor:pick_random_nodes(NewNodes, 1) of
[] -> ok;
[Node] -> gossip_head(Rumor, Node)
end.

-spec remove_current_node([node()]) -> [node()].
remove_current_node(Nodes) ->
Expand All @@ -417,6 +433,10 @@ gossip(#rumor{max_gossip_per_period = Max} = Rumor, Nodes) when is_list(Nodes) -
gossip(Rumor, Node) ->
gen_server:cast({?MODULE, Node}, {reconcile, Rumor, node()}).

-spec gossip_head(rumor(), node()) -> ok.
gossip_head(Rumor, Node) ->
gen_server:cast({?MODULE, Node}, {reconcile, simple_gossip_rumor:head(Rumor), node()}).

-spec schedule_gossip(rumor()) -> reference().
schedule_gossip(#rumor{gossip_period = Period}) ->
erlang:send_after(Period + rand:uniform(100), ?SERVER, tick).
Expand Down

0 comments on commit 2f396cd

Please sign in to comment.