From 2bcd4784eb1d0eddb85241084c3d8791bbccdcfb Mon Sep 17 00:00:00 2001 From: yastanotheruser Date: Sat, 16 Nov 2024 13:42:13 -0500 Subject: [PATCH] add get_coordinator function in group subscriber v2 --- src/brod_group_subscriber_v2.erl | 12 ++++++++++++ test/brod_group_subscriber_SUITE.erl | 26 ++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/src/brod_group_subscriber_v2.erl b/src/brod_group_subscriber_v2.erl index 47ece4b0..4eb7c657 100644 --- a/src/brod_group_subscriber_v2.erl +++ b/src/brod_group_subscriber_v2.erl @@ -35,6 +35,7 @@ , start_link/1 , stop/1 , get_workers/1 + , get_coordinator/1 ]). %% brod_group_coordinator callbacks @@ -208,6 +209,15 @@ get_workers(Pid) -> get_workers(Pid, Timeout) -> gen_server:call(Pid, get_workers, Timeout). +%% @doc Returns the group coordinator PID. +-spec get_coordinator(pid()) -> pid() | undefined. +get_coordinator(Pid) -> + get_coordinator(Pid, infinity). + +-spec get_coordinator(pid(), timeout()) -> pid() | undefined. +get_coordinator(Pid, Timeout) -> + gen_server:call(Pid, get_coordinator, Timeout). + %%%=================================================================== %%% group_coordinator callbacks %%%=================================================================== @@ -315,6 +325,8 @@ handle_call({assign_partitions, Members, TopicPartitionList}, _From, State) -> {reply, Reply, State}; handle_call(get_workers, _From, State = #state{workers = Workers}) -> {reply, Workers, State}; +handle_call(get_coordinator, _From, State = #state{coordinator = Coordinator}) -> + {reply, Coordinator, State}; handle_call(Call, _From, State) -> {reply, {error, {unknown_call, Call}}, State}. diff --git a/test/brod_group_subscriber_SUITE.erl b/test/brod_group_subscriber_SUITE.erl index a08cf569..2f9672cc 100644 --- a/test/brod_group_subscriber_SUITE.erl +++ b/test/brod_group_subscriber_SUITE.erl @@ -51,6 +51,7 @@ , t_consumer_crash/1 , t_assign_partitions_handles_updating_state/1 , t_get_workers/1 + , t_get_coordinator/1 , v2_coordinator_crash/1 , v2_consumer_cleanup/1 , v2_subscriber_shutdown/1 @@ -97,6 +98,7 @@ groups() -> , t_async_commit , t_assign_partitions_handles_updating_state , t_get_workers + , t_get_coordinator , v2_coordinator_crash , v2_consumer_cleanup , v2_subscriber_shutdown @@ -308,6 +310,30 @@ t_get_workers(Config) when is_list(Config) -> ok end. +t_get_coordinator(Config) when is_list(Config) -> + case ?config(behavior) of + brod_group_subscriber_v2 -> + %% only present in v2 + InitArgs = #{async_ack => true}, + Topic = ?topic, + ?check_trace( + #{ timeout => 10000 }, + %% Run stage: + begin + {ok, SubscriberPid} = start_subscriber(?group_id, Config, [Topic], InitArgs), + ct:sleep(2000), + brod_group_subscriber_v2:get_coordinator(SubscriberPid) + end, + %% Check stage: + fun(Coordinator, _Trace) -> + ct:pal("result: ~p", [Coordinator]), + ?assert(is_pid(Coordinator)), + ok + end); + _ -> + ok + end. + t_consumer_crash(Config) when is_list(Config) -> %% use consumer managed offset commit behaviour %% so we can control where to start fetching messages from