diff --git a/src/cre_master.erl b/src/cre_master.erl index 942501a..6211052 100644 --- a/src/cre_master.erl +++ b/src/cre_master.erl @@ -123,9 +123,9 @@ stop( CreName ) -> %% gen_server callback functions %%==================================================================== -code_change( _OldVsn, CreState, _Extra ) -> {ok, CreState}. +code_change( _OldVsn, CreState, _Extra ) -> {ok, CreState}. handle_call( _Request, _From, CreState ) -> {reply, {error, bad_msg}, CreState}. -terminate( _Reason, _CreState ) -> ok. +terminate( _Reason, _CreState ) -> ok. init( _Arg ) -> process_flag( trap_exit, true ), @@ -134,23 +134,26 @@ init( _Arg ) -> handle_cast( {add_worker, P}, CreState ) -> - error_logger:info_report( - [{info, "new worker"}, - {application, cre}, - {cre_master_pid, self()}, - {worker_pid, P}] ), + #cre_state{ idle_lst = IdleLst, busy_map = BusyMap } = CreState, + % extract pid because link/1 cannot deal with registered names Pid = if is_pid( P ) -> P; true -> whereis( P ) end, - true = link( Pid ), + error_logger:info_report( + [{info, "new worker"}, + {application, cre}, + {cre_master_pid, self()}, + {worker_pid, Pid}, + {nworker, length( IdleLst )+maps:size( BusyMap )+1}] ), - #cre_state{ idle_lst = IdleLst } = CreState, - CreState1 = CreState#cre_state{ idle_lst = [P|IdleLst] }, + true = link( Pid ), + + CreState1 = CreState#cre_state{ idle_lst = [Pid|IdleLst] }, CreState2 = attempt_progress( CreState1 ), @@ -158,6 +161,12 @@ handle_cast( {add_worker, P}, CreState ) -> handle_cast( {worker_result, P, A, Delta}, CreState ) -> + Pid = + if + is_pid( P ) -> P; + true -> whereis( P ) + end, + #cre_state{ subscr_map = SubscrMap, idle_lst = IdleLst, busy_map = BusyMap, @@ -170,10 +179,10 @@ handle_cast( {worker_result, P, A, Delta}, CreState ) -> case maps:get( A, BusyMap, undefined ) of - P -> + Pid -> lists:foreach( F, maps:get( A, SubscrMap ) ), CreState1 = CreState#cre_state{ subscr_map = maps:remove( A, SubscrMap ), - idle_lst = [P|IdleLst], + idle_lst = [Pid|IdleLst], busy_map = maps:remove( A, BusyMap ), cache = Cache#{ A => Delta } }, CreState2 = attempt_progress( CreState1 ), @@ -224,9 +233,14 @@ handle_info( {'EXIT', P, _Reason}, CreState ) -> busy_map = BusyMap, queue = Queue } = CreState, - + Pid = + if + is_pid( P ) -> P; + true -> whereis( P ) + end, + - case lists:member( P, IdleLst ) of + case lists:member( Pid, IdleLst ) of % an idle worker died true -> @@ -235,23 +249,25 @@ handle_info( {'EXIT', P, _Reason}, CreState ) -> [{info, "idle worker down"}, {application, cre}, {cre_master_pid, self()}, - {worker_pid, P}] ), + {worker_pid, Pid}, + {nworker, length( IdleLst )+maps:size( BusyMap )-1}] ), - CreState1 = CreState#cre_state{ idle_lst = IdleLst--[P] }, + CreState1 = CreState#cre_state{ idle_lst = IdleLst--[Pid] }, {noreply, CreState1}; false -> - case lists:keyfind( P, 2, maps:to_list( BusyMap ) ) of + case lists:keyfind( Pid, 2, maps:to_list( BusyMap ) ) of % a busy worker died - {A, P} -> + {A, Pid} -> error_logger:info_report( [{info, "busy worker down"}, {application, cre}, {cre_master_pid, self()}, - {worker_pid, P}] ), + {worker_pid, Pid}, + {nworker, length( IdleLst )+maps:size( BusyMap )-1}] ), CreState1 = CreState#cre_state{ queue = [A|Queue], busy_map = maps:remove( A, BusyMap ) }, @@ -265,7 +281,7 @@ handle_info( {'EXIT', P, _Reason}, CreState ) -> [{info, "exit signal received"}, {application, cre}, {cre_master_pid, self()}, - {from_pid, P}] ), + {from_pid, Pid}] ), {stop, exit, CreState} end @@ -278,6 +294,8 @@ handle_info( _Info, CreState ) -> {noreply, CreState}. %% Internal functions %%==================================================================== +-spec attempt_progress( CreState :: #cre_state{} ) -> #cre_state{}. + attempt_progress( CreState ) -> #cre_state{ idle_lst = IdleLst, @@ -298,11 +316,11 @@ attempt_progress( CreState ) -> [_|_] -> - P = lib_combin:pick_from( IdleLst ), - IdleLst1 = IdleLst--[P], - BusyMap1 = BusyMap#{ A => P }, + Pid = lib_combin:pick_from( IdleLst ), + IdleLst1 = IdleLst--[Pid], + BusyMap1 = BusyMap#{ A => Pid }, - cre_worker:worker_request( P, A ), + cre_worker:worker_request( Pid, A ), CreState#cre_state{ idle_lst = IdleLst1, busy_map = BusyMap1,