diff --git a/src/simple_gossip.hrl b/src/simple_gossip.hrl index 741f7d5..01fb0c0 100644 --- a/src/simple_gossip.hrl +++ b/src/simple_gossip.hrl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- -record(rumor, {gossip_version = 1 :: pos_integer(), - vector_clock = #{} :: #{node() => pos_integer()}, + vector_clock = simple_gossip_vclock:vclock(), data :: any(), leader :: node(), nodes = [] :: [node()], diff --git a/src/simple_gossip_rumor.erl b/src/simple_gossip_rumor.erl index 9d83efd..ef61d84 100644 --- a/src/simple_gossip_rumor.erl +++ b/src/simple_gossip_rumor.erl @@ -40,7 +40,8 @@ new() -> #rumor{leader = node(), nodes = [node()], - gossip_version = 1}. + gossip_version = 1, + vector_clock = simple_gossip_vclock:new()}. -spec new(rumor()) -> rumor(). new(#rumor{vector_clock = VectorClocks}) -> @@ -61,18 +62,13 @@ change_gossip_interval(#rumor{} = Rumor, Interval) -> increase_gossip_version(#rumor{gossip_version = GossipVersion} = Rumor) -> Rumor#rumor{gossip_version = GossipVersion+1}. --spec increase_vector_clock(rumor(), node()) -> rumor(). -increase_vector_clock(#rumor{vector_clock = VectorClock} = Rumor, Node) -> - NewVectorClock = VectorClock#{Node => maps:get(node(), VectorClock, 0)+1}, - Rumor#rumor{vector_clock = NewVectorClock}. +-spec increase_vector_clock(rumor()) -> rumor(). +increase_vector_clock(#rumor{vector_clock = VectorClock} = Rumor) -> + Rumor#rumor{vector_clock = simple_gossip_vclock:increment(VectorClock)}. -spec increase_version(rumor()) -> rumor(). -increase_version(Rumor) -> - increase_version(Rumor, node()). - --spec increase_version(rumor(), node()) -> rumor(). -increase_version(#rumor{} = Rumor, Node) -> - increase_vector_clock(increase_gossip_version(Rumor), Node). +increase_version(#rumor{} = Rumor) -> + increase_vector_clock(increase_gossip_version(Rumor)). -spec add_node(rumor(), node()) -> rumor(). add_node(#rumor{nodes = Nodes} = Rumor, Node) -> @@ -110,7 +106,7 @@ check_node_exclude(#rumor{} = Rumor) -> fun() -> ?LOG_INFO("New rumor created because" "the incoming rumor not contains the current node"), - increase_vector_clock(new(Rumor), node()) + increase_vector_clock(new(Rumor)) end). -spec pick_random_nodes([node()], non_neg_integer()) -> [node()]. @@ -168,5 +164,4 @@ if_leader(Rumor, _, _Fun) -> -spec check_vector_clocks(In :: rumor(), Current :: rumor()) -> boolean(). check_vector_clocks(#rumor{vector_clock = InVectorClocks}, #rumor{vector_clock = CurrentVectorClocks}) -> - Node = node(), - maps:get(Node, InVectorClocks, 0) >= maps:get(Node, CurrentVectorClocks, 0). + simple_gossip_vclock:descendant(InVectorClocks, CurrentVectorClocks). diff --git a/src/simple_gossip_vclock.erl b/src/simple_gossip_vclock.erl new file mode 100644 index 0000000..46640f1 --- /dev/null +++ b/src/simple_gossip_vclock.erl @@ -0,0 +1,55 @@ +%%%------------------------------------------------------------------- +%%% @author Peter Tihanyi +%%% @copyright (C) 2023, Systream +%%% @doc +%%% +%%% @end +%%%------------------------------------------------------------------- +-module(simple_gossip_vclock). + +%% API +-export([new/0, increment/1, descendant/2, increment/2]). + +-type vclock() :: map(). + +-export_type([vclock/0]). + +-spec new() -> vclock(). +new() -> + #{}. + +-spec increment(vclock()) -> vclock(). +increment(VClock) -> + increment(VClock, node(), timestamp()). + +-spec increment(vclock(), node()) -> vclock(). +increment(VClock, Node) -> + increment(VClock, Node, timestamp()). + +-spec increment(vclock(), node(), pos_integer()) -> vclock(). +increment(VClock, Node, Timestamp) -> + case maps:get(Node, VClock, not_found) of + not_found -> VClock#{Node => {1, Timestamp}}; + {Counter, _OldTs} -> VClock#{Node => {Counter+1, Timestamp}} + end. + +-spec descendant(vclock(), vclock() | none | {node(), {pos_integer(), pos_integer()}, term()}) -> + boolean(). +descendant(VClockA, {Node, {CounterB, TSB}, Iterator}) -> + case maps:get(Node, VClockA, not_found) of + not_found -> false; + {CounterA, _TSA} when CounterA > CounterB -> + descendant(VClockA, maps:next(Iterator)); + {CounterA, TSA} when CounterA =:= CounterB andalso TSA =:= TSB -> + descendant(VClockA, maps:next(Iterator)); + _ -> + false + end; +descendant(_VClockA, none) -> + true; +descendant(VClockA, VClockB) when is_map(VClockB) -> + descendant(VClockA, maps:next(maps:iterator(VClockB))). + +-spec timestamp() -> pos_integer(). +timestamp() -> + erlang:system_time(nanosecond). \ No newline at end of file diff --git a/test/simple_gossip_SUITE.erl b/test/simple_gossip_SUITE.erl index d18b37d..062cebb 100644 --- a/test/simple_gossip_SUITE.erl +++ b/test/simple_gossip_SUITE.erl @@ -120,7 +120,8 @@ all() -> stop_node, stop_leader_node, start_from_persist_data, - serve_persist_from_memory + serve_persist_from_memory, + vclock ]. %%-------------------------------------------------------------------- @@ -381,6 +382,23 @@ serve_persist_from_memory(_Config) -> ?assertMatch({ok, {rumor, _, _, "test_from_not_persisted_data", _, _, _, _}}, simple_gossip_persist:get()). +vclock(_Config) -> + A = simple_gossip_vclock:new(), + B = simple_gossip_vclock:new(), + ?assert(simple_gossip_vclock:descendant(A, B)), + ?assert(simple_gossip_vclock:descendant(B, A)), + A1_1 = simple_gossip_vclock:increment(A, node_a), + A1_2 = simple_gossip_vclock:increment(A, node_b), + ?assert(simple_gossip_vclock:descendant(A1_1, A)), + ?assert(simple_gossip_vclock:descendant(A1_2, A)), + ?assertNot(simple_gossip_vclock:descendant(A1_2, A1_1)), + ?assertNot(simple_gossip_vclock:descendant(A1_1, A1_2)), + A2 = simple_gossip_vclock:increment(A1_1, node_b), + ?assert(simple_gossip_vclock:descendant(A2, A1_1)), + ?assert(simple_gossip_vclock:descendant(A2, A)), + ?assertNot(simple_gossip_vclock:descendant(A2, A1_2)), + ok. + receive_notify(Ref) -> receive {data_changed, {Data, Ref}} ->