From 2f396cdfafca756f550a5c54d9442e8b55f8cb94 Mon Sep 17 00:00:00 2001 From: Peter Tihanyi Date: Tue, 28 Nov 2023 22:06:11 +0100 Subject: [PATCH] Send only a rumor head on tick --- src/simple_gossip.hrl | 7 +++++- src/simple_gossip_rumor.erl | 43 ++++++++++++++++++++++-------------- src/simple_gossip_server.erl | 26 +++++++++++++++++++--- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/src/simple_gossip.hrl b/src/simple_gossip.hrl index 01fb0c0..96cf50c 100644 --- a/src/simple_gossip.hrl +++ b/src/simple_gossip.hrl @@ -11,9 +11,14 @@ data :: any(), leader :: node(), nodes = [] :: [node()], - max_gossip_per_period = 3 :: pos_integer(), + max_gossip_per_period = 1 :: pos_integer(), gossip_period = 15000 :: pos_integer() }). +-record(rumor_head, {gossip_version = 1 :: pos_integer(), + vector_clock = simple_gossip_vclock:vclock() +}). + -type rumor() :: #rumor{}. +-type rumor_head() :: #rumor_head{}. -export_type([rumor/0]). diff --git a/src/simple_gossip_rumor.erl b/src/simple_gossip_rumor.erl index ef61d84..161caf0 100644 --- a/src/simple_gossip_rumor.erl +++ b/src/simple_gossip_rumor.erl @@ -14,22 +14,23 @@ %% API -export([new/0, - new/1, - add_node/2, - change_leader/1, - remove_node/2, - nodes/1, - pick_random_nodes/2, - check_node_exclude/1, - if_member/3, - if_not_member/3, - set_data/2, - data/1, - check_vector_clocks/2, - change_max_gossip_per_period/2, - change_gossip_interval/2, - calculate_new_leader/1, - calculate_new_leader/2]). + new/1, + head/1, + add_node/2, + change_leader/1, + remove_node/2, + nodes/1, + pick_random_nodes/2, + check_node_exclude/1, + if_member/3, + if_not_member/3, + set_data/2, + data/1, + check_vector_clocks/2, + change_max_gossip_per_period/2, + change_gossip_interval/2, + calculate_new_leader/1, + calculate_new_leader/2, version/1]). -type manage_node_fun() :: fun(() -> rumor()). -type if_leader_node_fun() :: fun((node()) -> rumor()). @@ -50,6 +51,10 @@ new(#rumor{vector_clock = VectorClocks}) -> gossip_version = 1, vector_clock = VectorClocks}. +-spec head(rumor()) -> rumor_head(). +head(#rumor{gossip_version = GossipVersion, vector_clock = VectorClock}) -> + #rumor_head{gossip_version = GossipVersion, vector_clock = VectorClock}. + -spec change_max_gossip_per_period(rumor(), pos_integer()) -> rumor(). change_max_gossip_per_period(#rumor{} = Rumor, Period) -> increase_version(Rumor#rumor{max_gossip_per_period = Period}). @@ -90,6 +95,12 @@ set_data(#rumor{} = Rumor, Data) -> data(#rumor{data = Data}) -> Data. +-spec version(rumor_head() | rumor()) -> pos_integer(). +version(#rumor{gossip_version = Version}) -> + Version; +version(#rumor_head{gossip_version = Version}) -> + Version. + -spec remove_node(rumor(), node()) -> rumor(). remove_node(#rumor{nodes = Nodes} = Rumor, Node) -> if_member(Rumor, Node, diff --git a/src/simple_gossip_server.erl b/src/simple_gossip_server.erl index 2b8af1b..30759ba 100644 --- a/src/simple_gossip_server.erl +++ b/src/simple_gossip_server.erl @@ -218,6 +218,9 @@ handle_call({join_request, #rumor{gossip_version = InGossipVsn}}, {FromPid, _}, " Asking remote to upgrade", [Node, CurrentGossipVsn, InGossipVsn]), simple_gossip_event:notify(NewRumor), {reply, {upgrade, NewRumor}, reschedule_gossip(State#state{rumor = NewRumor})}; +handle_call(reconcile, {FromPid, _FromRef}, #state{rumor = Rumor} = State) -> + gossip(Rumor, node(FromPid)), + {reply, ok, State}; handle_call(Command, From, State) -> proxy_call(Command, From, State). @@ -322,6 +325,11 @@ handle_command(whisper_your_gossip, _From, {noreply, NewState :: state()} | {noreply, NewState :: state(), timeout() | hibernate} | {stop, Reason :: term(), NewState :: state()}). +handle_cast({reconcile, #rumor_head{gossip_version = InVersion}, SenderNode}, + #state{rumor = #rumor{gossip_version = Version}} = State) + when InVersion > Version -> + gen_server:call({?SERVER, SenderNode}, reconcile), + {noreply, State}; handle_cast({reconcile, #rumor{gossip_version = InVersion} = InRumor, SenderNode}, @@ -341,9 +349,14 @@ handle_cast({reconcile, ?LOG_WARNING("Dropped newer gossip due to vclock check", []), {noreply, State} end; -handle_cast({reconcile, Rumor, SenderNode}, State) -> +handle_cast({reconcile, #rumor_head{gossip_version = InVersion}, _SenderNode}, + #state{rumor = #rumor{gossip_version = Version}} = State) + when InVersion =:= Version -> + {noreply, State}; +handle_cast({reconcile, InRumor, SenderNode}, #state{rumor = Rumor} = State) -> ?LOG_DEBUG("Gossip ~p dropped due to lower version, From ~p", - [Rumor#rumor.gossip_version, SenderNode]), + [simple_gossip_rumor:version(InRumor), SenderNode]), + gossip(Rumor, SenderNode), {noreply, State}; handle_cast({get_gossip_version, Requester, Ref}, @@ -403,7 +416,10 @@ gossip_random_node(#rumor{nodes = []}) -> ok; gossip_random_node(Rumor) -> NewNodes = remove_current_node(simple_gossip_rumor:nodes(Rumor)), - gossip(Rumor, simple_gossip_rumor:pick_random_nodes(NewNodes, 1)). + case simple_gossip_rumor:pick_random_nodes(NewNodes, 1) of + [] -> ok; + [Node] -> gossip_head(Rumor, Node) + end. -spec remove_current_node([node()]) -> [node()]. remove_current_node(Nodes) -> @@ -417,6 +433,10 @@ gossip(#rumor{max_gossip_per_period = Max} = Rumor, Nodes) when is_list(Nodes) - gossip(Rumor, Node) -> gen_server:cast({?MODULE, Node}, {reconcile, Rumor, node()}). +-spec gossip_head(rumor(), node()) -> ok. +gossip_head(Rumor, Node) -> + gen_server:cast({?MODULE, Node}, {reconcile, simple_gossip_rumor:head(Rumor), node()}). + -spec schedule_gossip(rumor()) -> reference(). schedule_gossip(#rumor{gossip_period = Period}) -> erlang:send_after(Period + rand:uniform(100), ?SERVER, tick).