Skip to content

Commit

Permalink
Make the CRE master deal only with worker PIDs, instead of registered…
Browse files Browse the repository at this point in the history
… names internally and also increase its verbosity.
  • Loading branch information
joergen7 committed Mar 20, 2018
1 parent 1d976c5 commit 006e31a
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions src/cre_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ stop( CreName ) ->
%% gen_server callback functions
%%====================================================================

code_change( _OldVsn, CreState, _Extra ) -> {ok, CreState}.
code_change( _OldVsn, CreState, _Extra ) -> {ok, CreState}.
handle_call( _Request, _From, CreState ) -> {reply, {error, bad_msg}, CreState}.
terminate( _Reason, _CreState ) -> ok.
terminate( _Reason, _CreState ) -> ok.

init( _Arg ) ->
process_flag( trap_exit, true ),
Expand All @@ -134,30 +134,39 @@ init( _Arg ) ->

handle_cast( {add_worker, P}, CreState ) ->

error_logger:info_report(
[{info, "new worker"},
{application, cre},
{cre_master_pid, self()},
{worker_pid, P}] ),
#cre_state{ idle_lst = IdleLst, busy_map = BusyMap } = CreState,

% extract pid because link/1 cannot deal with registered names
Pid =
if
is_pid( P ) -> P;
true -> whereis( P )
end,

true = link( Pid ),
error_logger:info_report(
[{info, "new worker"},
{application, cre},
{cre_master_pid, self()},
{worker_pid, Pid},
{nworker, length( IdleLst )+maps:size( BusyMap )+1}] ),

#cre_state{ idle_lst = IdleLst } = CreState,

CreState1 = CreState#cre_state{ idle_lst = [P|IdleLst] },
true = link( Pid ),

CreState1 = CreState#cre_state{ idle_lst = [Pid|IdleLst] },

CreState2 = attempt_progress( CreState1 ),

{noreply, CreState2};

handle_cast( {worker_result, P, A, Delta}, CreState ) ->

Pid =
if
is_pid( P ) -> P;
true -> whereis( P )
end,

#cre_state{ subscr_map = SubscrMap,
idle_lst = IdleLst,
busy_map = BusyMap,
Expand All @@ -170,10 +179,10 @@ handle_cast( {worker_result, P, A, Delta}, CreState ) ->

case maps:get( A, BusyMap, undefined ) of

P ->
Pid ->
lists:foreach( F, maps:get( A, SubscrMap ) ),
CreState1 = CreState#cre_state{ subscr_map = maps:remove( A, SubscrMap ),
idle_lst = [P|IdleLst],
idle_lst = [Pid|IdleLst],
busy_map = maps:remove( A, BusyMap ),
cache = Cache#{ A => Delta } },
CreState2 = attempt_progress( CreState1 ),
Expand Down Expand Up @@ -224,9 +233,14 @@ handle_info( {'EXIT', P, _Reason}, CreState ) ->
busy_map = BusyMap,
queue = Queue } = CreState,


Pid =
if
is_pid( P ) -> P;
true -> whereis( P )
end,


case lists:member( P, IdleLst ) of
case lists:member( Pid, IdleLst ) of

% an idle worker died
true ->
Expand All @@ -235,23 +249,25 @@ handle_info( {'EXIT', P, _Reason}, CreState ) ->
[{info, "idle worker down"},
{application, cre},
{cre_master_pid, self()},
{worker_pid, P}] ),
{worker_pid, Pid},
{nworker, length( IdleLst )+maps:size( BusyMap )-1}] ),

CreState1 = CreState#cre_state{ idle_lst = IdleLst--[P] },
CreState1 = CreState#cre_state{ idle_lst = IdleLst--[Pid] },

{noreply, CreState1};

false ->
case lists:keyfind( P, 2, maps:to_list( BusyMap ) ) of
case lists:keyfind( Pid, 2, maps:to_list( BusyMap ) ) of

% a busy worker died
{A, P} ->
{A, Pid} ->

error_logger:info_report(
[{info, "busy worker down"},
{application, cre},
{cre_master_pid, self()},
{worker_pid, P}] ),
{worker_pid, Pid},
{nworker, length( IdleLst )+maps:size( BusyMap )-1}] ),

CreState1 = CreState#cre_state{ queue = [A|Queue],
busy_map = maps:remove( A, BusyMap ) },
Expand All @@ -265,7 +281,7 @@ handle_info( {'EXIT', P, _Reason}, CreState ) ->
[{info, "exit signal received"},
{application, cre},
{cre_master_pid, self()},
{from_pid, P}] ),
{from_pid, Pid}] ),

{stop, exit, CreState}
end
Expand All @@ -278,6 +294,8 @@ handle_info( _Info, CreState ) -> {noreply, CreState}.
%% Internal functions
%%====================================================================

-spec attempt_progress( CreState :: #cre_state{} ) -> #cre_state{}.

attempt_progress( CreState ) ->

#cre_state{ idle_lst = IdleLst,
Expand All @@ -298,11 +316,11 @@ attempt_progress( CreState ) ->

[_|_] ->

P = lib_combin:pick_from( IdleLst ),
IdleLst1 = IdleLst--[P],
BusyMap1 = BusyMap#{ A => P },
Pid = lib_combin:pick_from( IdleLst ),
IdleLst1 = IdleLst--[Pid],
BusyMap1 = BusyMap#{ A => Pid },

cre_worker:worker_request( P, A ),
cre_worker:worker_request( Pid, A ),

CreState#cre_state{ idle_lst = IdleLst1,
busy_map = BusyMap1,
Expand Down

0 comments on commit 006e31a

Please sign in to comment.