From 8fcb175a3255d308c79a944d0135d3ae743c238a Mon Sep 17 00:00:00 2001 From: Ethan Yang Date: Tue, 24 Nov 2015 14:01:33 +0800 Subject: [PATCH 1/4] return process name(atom) rather than pid when creating mold stream. --- src/molderl.erl | 15 +++++++--- src/molderl_recovery.erl | 17 ++++++----- src/molderl_stream.erl | 46 +++++++++++++----------------- src/molderl_utils.erl | 22 +++++++++++++- test/molderl_integration_tests.erl | 8 +++--- 5 files changed, 66 insertions(+), 42 deletions(-) diff --git a/src/molderl.erl b/src/molderl.erl index 6518821..58b4644 100755 --- a/src/molderl.erl +++ b/src/molderl.erl @@ -22,16 +22,16 @@ start_link(SupervisorPID) -> gen_server:start_link({local, ?MODULE}, ?MODULE, SupervisorPID, []). -spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number()) - -> {'ok', pid()} | {'error', atom()}. + -> {'ok', atom()} | {'error', atom()}. create_stream(StreamName,Destination,DestinationPort,RecoveryPort) -> create_stream(StreamName,Destination,DestinationPort,RecoveryPort, []). -spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number(), [{atom(), term()}]) - -> {'ok', pid()} | {'error', atom()}. + -> {'ok', atom()} | {'error', atom()}. create_stream(StreamName, Destination, DestinationPort, RecoveryPort, Options) -> gen_server:call(?MODULE, {create_stream, StreamName, Destination, DestinationPort, RecoveryPort, Options}). --spec send_message(pid(), binary()) -> 'ok'. +-spec send_message(atom(), binary()) -> 'ok'. send_message(Stream, Message) -> gen_server:cast(?MODULE, {send, Stream, Message, os:timestamp()}). @@ -70,7 +70,14 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo Stream = #stream{destination_addr={Destination,DestinationPort}, recovery_port=RecoveryPort, filename=FileName}, - {reply, {ok, StreamPid}, State#state{streams=[Stream|State#state.streams]}}; + {registered_name, ProcessName} = process_info(StreamPid, registered_name), + + % start recovery process + RecoveryArguments = [{mold_stream, ProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]], + RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [RecoveryArguments], transient, worker), + {ok, _RecoveryProcess} = supervisor:start_child(Pid, RecoverySpec), + + {reply, {ok, ProcessName}, State#state{streams=[Stream|State#state.streams]}}; {error, Error} -> {reply, {error, Error}, State} end; diff --git a/src/molderl_recovery.erl b/src/molderl_recovery.erl index e438fce..017a133 100644 --- a/src/molderl_recovery.erl +++ b/src/molderl_recovery.erl @@ -30,9 +30,9 @@ start_link(Arguments) -> gen_server:start_link(?MODULE, Arguments, []). --spec store(pid(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok. -store(Pid, Msgs, MsgsSize, NumMsgs) -> - gen_server:cast(Pid, {store, Msgs, MsgsSize, NumMsgs}). +-spec store(atom(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok. +store(ProcessName, Msgs, MsgsSize, NumMsgs) -> + gen_server:cast(ProcessName, {store, Msgs, MsgsSize, NumMsgs}). init(Arguments) -> @@ -40,12 +40,12 @@ init(Arguments) -> {recoveryport, RecoveryPort} = lists:keyfind(recoveryport, 1, Arguments), {packetsize, PacketSize} = lists:keyfind(packetsize, 1, Arguments), {filename, FileName} = lists:keyfind(filename, 1, Arguments), - {mold_stream, MoldStreamPid} = lists:keyfind(mold_stream, 1, Arguments), + {mold_stream, MoldStreamProcessName} = lists:keyfind(mold_stream, 1, Arguments), {max_recovery_count, MaxRecoveryCount} = lists:keyfind(max_recovery_count, 1, Arguments), process_flag(trap_exit, true), % so that terminate/2 gets called when process exits - self() ! {initialize, FileName, MoldStreamPid}, + self() ! {initialize, FileName, MoldStreamProcessName}, {ok, Socket} = gen_udp:open(RecoveryPort, [binary, {active,once}, {reuseaddr, true}]), @@ -56,6 +56,9 @@ init(Arguments) -> statsd_latency_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.latency", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.received"}, + RecoveryProcessName = molderl_utils:gen_recoveryprocessname(StreamName), + register(RecoveryProcessName, self()), + lager:info("[molderl] Register molderl_recovery pid[~p] with name[~p]", [self(), RecoveryProcessName]), {ok, State}. handle_cast({store, Msgs, MsgsSize, NumMsgs}, State) -> @@ -104,7 +107,7 @@ handle_info({udp, _Client, IP, Port, IllFormedRequest}, State) -> ok = inet:setopts(State#state.socket, [{active, once}]), {noreply, State}; -handle_info({initialize, FileName, MoldStreamPid}, State) -> +handle_info({initialize, FileName, MoldStreamProcessName}, State) -> case file:open(FileName, [read, append, raw, binary, read_ahead]) of {ok, IoDevice} -> Log = "[molderl] Rebuilding MOLDUDP64 index from disk cache ~p. This may take some time.", @@ -114,7 +117,7 @@ handle_info({initialize, FileName, MoldStreamPid}, State) -> SeqNum = length(Index), Fmt = "[molderl] Successfully restored ~p MOLD packets from file ~p", lager:info(Fmt, [SeqNum, FileName]), - ok = molderl_stream:set_sequence_number(MoldStreamPid, SeqNum+1), + ok = molderl_stream:set_sequence_number(MoldStreamProcessName, SeqNum+1), NewState = State#state{last_seq_num=SeqNum, blocks_store=IoDevice, store_size=FileSize, diff --git a/src/molderl_stream.erl b/src/molderl_stream.erl index 2e3dbe1..b9b0f80 100755 --- a/src/molderl_stream.erl +++ b/src/molderl_stream.erl @@ -19,7 +19,7 @@ socket :: inet:socket(), % The socket to send the data on destination_port :: inet:port_number(), % Destination port for the data % to be encoded in a MOLD64 packet - recovery_service :: pid() , % Pid of the recovery service message + recovery_service :: atom() , % name of the recovery service message prod_interval :: pos_integer(), % Maximum interval at which either partial packets % or heartbeats should be sent statsd_latency_key_in :: string(), % @@ -38,13 +38,13 @@ start_link(Arguments) -> gen_server:start_link(?MODULE, Arguments, []). --spec send(pid(), binary(), erlang:timestamp()) -> 'ok'. -send(Pid, Message, StartTime) -> - gen_server:cast(Pid, {send, Message, StartTime}). +-spec send(atom(), binary(), erlang:timestamp()) -> 'ok'. +send(ProcessName, Message, StartTime) -> + gen_server:cast(ProcessName, {send, Message, StartTime}). --spec set_sequence_number(pid(), pos_integer()) -> 'ok'. -set_sequence_number(Pid, SeqNum) -> - gen_server:cast(Pid, {sequence_number, SeqNum}). +-spec set_sequence_number(atom(), pos_integer()) -> 'ok'. +set_sequence_number(ProcessName, SeqNum) -> + gen_server:cast(ProcessName, {sequence_number, SeqNum}). init(Arguments) -> @@ -57,10 +57,6 @@ init(Arguments) -> process_flag(trap_exit, true), % so that terminate/2 gets called when process exits - % send yourself a reminder to start recovery process - RecoveryArguments = [{mold_stream, self()}|[{packetsize, ?PACKET_SIZE}|Arguments]], - self() ! {initialize, RecoveryArguments}, - Connection = gen_udp:open(0, [binary, {broadcast, true}, {ip, IPAddressToSendFrom}, @@ -78,8 +74,12 @@ init(Arguments) -> prod_interval = ProdInterval, statsd_latency_key_in = "molderl." ++ atom_to_list(StreamName) ++ ".time_in", statsd_latency_key_out = "molderl." ++ atom_to_list(StreamName) ++ ".time_out", - statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent"}, + statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent", + recovery_service = molderl_utils:gen_recoveryprocessname(StreamName)}, State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)}, + ProcessName = molderl_utils:gen_streamprocessname(StreamName), + register(ProcessName, self()), + lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), ProcessName]), {ok, {Info, State}}; {error, Reason} -> lager:error("[molderl] Unable to open UDP socket on ~p because '~p'. Aborting.", @@ -105,7 +105,7 @@ handle_cast({send, Msg, StartTime}, {Info, OldState=#state{messages={_,[]}}}) -> % third handle if the msg is big enough to promotes the current msgs buffer to a packet handle_cast({send, Msg, Start}, {Info, OldState=#state{packets=Pckts, messages=Msgs, buffer_size=Size}}) when Size+byte_size(Msg)+2 > ?PACKET_SIZE -> - State = OldState#state{packets=[Msgs|Pckts], messages={Start, [Msg]}, buffer_size=0}, + State = OldState#state{packets=[Msgs|Pckts], messages={Start, [Msg]}, buffer_size=byte_size(Msg)+22}, case flush(Info, State) of {ok, NewState} -> {noreply, {Info, NewState}}; @@ -128,15 +128,13 @@ handle_info(prod, {Info, State=#state{sequence_number=undefined}}) -> handle_info(prod, {Info, State=#state{packets=[], messages={_,[]}}}) -> % Timer triggered a send, but packets/msgs queue empty - lager:debug("[molderl] Sending heartbeat... Stream:~p, SeqNum:~p.~n", [Info#info.stream_name, State#state.sequence_number]), send_heartbeat(Info, State#state.sequence_number), TRef = erlang:send_after(Info#info.prod_interval, self(), prod), {noreply, {Info, State#state{timer_ref=TRef}}}; handle_info(prod, {Info, OldState=#state{packets=Pckts, messages=Msgs}}) -> % Timer triggered a send, flush packets/msgs buffer - State = OldState#state{packets=[Msgs|Pckts], messages={{0,0,0}, []}}, - lager:debug("[molderl] Flushing... Stream:~p, NumPackets:~p.~n", [Info#info.stream_name, length(State#state.packets)]), + State = OldState#state{packets=[Msgs|Pckts], messages={{0,0,0}, []}, buffer_size=0}, case flush(Info, State) of {ok, NewState} -> {noreply, {Info, NewState}}; @@ -144,12 +142,6 @@ handle_info(prod, {Info, OldState=#state{packets=Pckts, messages=Msgs}}) -> {stop, Reason, {Info, State}} end; -handle_info({initialize, Arguments}, {Info, State}) -> - {supervisorpid, SupervisorPID} = lists:keyfind(supervisorpid, 1, Arguments), - RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [Arguments], transient, worker), - {ok, RecoveryProcess} = supervisor:start_child(SupervisorPID, RecoverySpec), - {noreply, {Info#info{recovery_service=RecoveryProcess}, State}}; - handle_info(Info, State) -> lager:error("[molderl] molderl_stream:handle_info received unexpected message. Info:~p, State:~p.~n", [Info, State]), {noreply, State}. @@ -165,16 +157,20 @@ terminate(Reason, {Info, _State}) -> -spec flush(#info{}, #state{}) -> {'ok', #state{}} | {'error', inet:posix()}. flush(Info, State=#state{sequence_number=undefined}) -> - erlang:cancel_timer(State#state.timer_ref), % can't send messages out because we don't know our sequence number yet + % Asynchronous erlang:cancel_timer/2 is only supported in ERTS 7... +% erlang:cancel_timer(State#state.timer_ref, [{async, true}]), + erlang:cancel_timer(State#state.timer_ref), TRef = erlang:send_after(Info#info.prod_interval, self(), prod), {ok, State#state{timer_ref=TRef}}; flush(Info=#info{prod_interval=Interval}, State=#state{packets=Pckts}) -> + % Asynchronous erlang:cancel_timer/2 is only supported in ERTS 7... +% erlang:cancel_timer(State#state.timer_ref, [{async, true}]), erlang:cancel_timer(State#state.timer_ref), + TRef = erlang:send_after(Interval, self(), prod), case flush(Info, State#state.sequence_number, lists:reverse(Pckts)) of {ok, SeqNum, UnsentPckts} -> - TRef = erlang:send_after(Interval, self(), prod), {ok, State#state{sequence_number=SeqNum, packets=UnsentPckts, timer_ref=TRef}}; {error, Error} -> {error, Error} @@ -189,7 +185,6 @@ flush(Info, SeqNum, [{_Start, []}|Pckts]) -> % empty packet, ignore and go on flush(Info, SeqNum, Pckts); flush(Info=#info{stream_name=Name, socket=Socket}, SeqNum, [{Start, Msgs}|Pckts]) -> - lager:debug("[molderl] Encoding and sending mold packet. Stream:~p, SeqNum:~p, Remaining Packets:~p.~n", [Name, SeqNum, length(Pckts)]), {EncodedMsgs, EncodedMsgsSize, NumMsgs} = molderl_utils:encode_messages(Msgs), Payload = molderl_utils:gen_messagepacket(Name, SeqNum, NumMsgs, EncodedMsgs), case gen_udp:send(Socket, Info#info.destination, Info#info.destination_port, Payload) of @@ -197,7 +192,6 @@ flush(Info=#info{stream_name=Name, socket=Socket}, SeqNum, [{Start, Msgs}|Pckts] molderl_recovery:store(Info#info.recovery_service, EncodedMsgs, EncodedMsgsSize, NumMsgs), statsderl:timing_now(Info#info.statsd_latency_key_out, Start, 0.1), statsderl:increment(Info#info.statsd_count_key, 1, 0.1), - lager:debug("[molderl] Sent mold packet. Stream:~p, SeqNum:~p, NumMsgs:~p.~n", [Name, SeqNum, NumMsgs]), flush(Info, SeqNum+NumMsgs, Pckts); {error, eagain} -> % retry next cycle lager:error("[molderl] Error sending UDP packets: (eagain) resource temporarily unavailable'. Stream:~p. Retrying...~n", [Name]), diff --git a/src/molderl_utils.erl b/src/molderl_utils.erl index 5153199..1e4e8d5 100755 --- a/src/molderl_utils.erl +++ b/src/molderl_utils.erl @@ -8,7 +8,9 @@ gen_endofsession/2, gen_streamname/1, encode_messages/1, - gen_messagepacket/4]). + gen_messagepacket/4, + gen_streamprocessname/1, + gen_recoveryprocessname/1]). -include("molderl.hrl"). @@ -45,6 +47,13 @@ gen_streamname(StreamName) when is_list(StreamName), length(StreamName) > 10 -> gen_streamname(StreamName) when is_list(StreamName) -> binary_padder(list_to_binary(StreamName)). +-spec gen_streamprocessname(atom()) -> atom(). +gen_streamprocessname(StreamName) -> + list_to_atom(atom_to_list(mold_stream_) ++ atom_to_list(StreamName)). + +-spec gen_recoveryprocessname(atom()) -> atom(). +gen_recoveryprocessname(StreamName) -> + list_to_atom(atom_to_list(mold_recovery_) ++ atom_to_list(StreamName)). %% -------------------------------------------- %% Takes a binary and pads it out to ten bytes. %% This is needed by the Stream Name. @@ -147,5 +156,16 @@ gen_messagepacket_empty_test() -> Expected = <<<<"foo">>/binary, 23:64, 0:16>>, ?assertEqual(Expected, Observed). +%% ----------------------- +%% Tests for gen_streamprocessname +%% ----------------------- +gen_streamprocessname_test() -> + ?assert(gen_streamprocessname(abcd) == mold_stream_abcd). + +%% ----------------------- +%% Tests for gen_recoveryprocessname +%% ----------------------- +gen_recoveryprocessname_test() -> + ?assert(gen_recoveryprocessname(abcd) == mold_recovery_abcd). -endif. diff --git a/test/molderl_integration_tests.erl b/test/molderl_integration_tests.erl index 7480cae..2e63c85 100644 --- a/test/molderl_integration_tests.erl +++ b/test/molderl_integration_tests.erl @@ -11,7 +11,7 @@ -compile([{parse_transform, lager_transform}]). --record(stream, {pid :: pid(), +-record(stream, {process_name:: atom(), name :: string(), ip :: inet:ip4_address(), port :: inet:port_number(), @@ -90,7 +90,7 @@ loop(State=#state{sent=Sent, inflight=Inflight, max_seq_num_rcvd=MaxSeqNumRcvd}, send(State=#state{stream=Stream, sent=Sent}) -> % generate random payload of random size < 10 bytes Msg = crypto:strong_rand_bytes(random:uniform(10)), - case molderl:send_message(Stream#stream.pid, Msg) of + case molderl:send_message(Stream#stream.process_name, Msg) of ok -> Fmt = "[SUCCESS] Sent packet seq num: ~p, msg: ~p", Outcome = io_lib:format(Fmt, [length(Sent)+1, Msg]), @@ -163,8 +163,8 @@ crash(State) -> launch_stream(Stream=#stream{port=P, recovery_port=RP, file=F, ip=IP}) -> ok = application:start(molderl), - {ok, Pid} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]), - Stream#stream{pid=Pid}. + {ok, ProcessName} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]), + Stream#stream{process_name=ProcessName}. clean_up() -> application:stop(molderl). From e46c639c481b2da9ede1af50ee348b4f03474620 Mon Sep 17 00:00:00 2001 From: Ethan Yang Date: Tue, 24 Nov 2015 15:04:48 +0800 Subject: [PATCH 2/4] minor fix accroding to the review comments. --- src/molderl_recovery.erl | 2 +- src/molderl_stream.erl | 4 ++-- src/molderl_utils.erl | 25 ++++++++----------------- test/molderl_integration_tests.erl | 2 +- 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/molderl_recovery.erl b/src/molderl_recovery.erl index 017a133..624d328 100644 --- a/src/molderl_recovery.erl +++ b/src/molderl_recovery.erl @@ -56,7 +56,7 @@ init(Arguments) -> statsd_latency_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.latency", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.received"}, - RecoveryProcessName = molderl_utils:gen_recoveryprocessname(StreamName), + RecoveryProcessName = molderl_utils:gen_processname(recovery, StreamName), register(RecoveryProcessName, self()), lager:info("[molderl] Register molderl_recovery pid[~p] with name[~p]", [self(), RecoveryProcessName]), {ok, State}. diff --git a/src/molderl_stream.erl b/src/molderl_stream.erl index b9b0f80..95b432a 100755 --- a/src/molderl_stream.erl +++ b/src/molderl_stream.erl @@ -75,9 +75,9 @@ init(Arguments) -> statsd_latency_key_in = "molderl." ++ atom_to_list(StreamName) ++ ".time_in", statsd_latency_key_out = "molderl." ++ atom_to_list(StreamName) ++ ".time_out", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent", - recovery_service = molderl_utils:gen_recoveryprocessname(StreamName)}, + recovery_service = molderl_utils:gen_processname(recovery, StreamName)}, State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)}, - ProcessName = molderl_utils:gen_streamprocessname(StreamName), + ProcessName = molderl_utils:gen_processname(stream, StreamName), register(ProcessName, self()), lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), ProcessName]), {ok, {Info, State}}; diff --git a/src/molderl_utils.erl b/src/molderl_utils.erl index 1e4e8d5..f4b02ef 100755 --- a/src/molderl_utils.erl +++ b/src/molderl_utils.erl @@ -9,8 +9,7 @@ gen_streamname/1, encode_messages/1, gen_messagepacket/4, - gen_streamprocessname/1, - gen_recoveryprocessname/1]). + gen_processname/2]). -include("molderl.hrl"). @@ -47,13 +46,10 @@ gen_streamname(StreamName) when is_list(StreamName), length(StreamName) > 10 -> gen_streamname(StreamName) when is_list(StreamName) -> binary_padder(list_to_binary(StreamName)). --spec gen_streamprocessname(atom()) -> atom(). -gen_streamprocessname(StreamName) -> - list_to_atom(atom_to_list(mold_stream_) ++ atom_to_list(StreamName)). +-spec gen_processname(atom(), atom()) -> atom(). +gen_processname(ProcessType, StreamName) -> + list_to_atom(atom_to_list(mold_) ++ atom_to_list(ProcessType) ++ "_" ++ atom_to_list(StreamName)). --spec gen_recoveryprocessname(atom()) -> atom(). -gen_recoveryprocessname(StreamName) -> - list_to_atom(atom_to_list(mold_recovery_) ++ atom_to_list(StreamName)). %% -------------------------------------------- %% Takes a binary and pads it out to ten bytes. %% This is needed by the Stream Name. @@ -157,15 +153,10 @@ gen_messagepacket_empty_test() -> ?assertEqual(Expected, Observed). %% ----------------------- -%% Tests for gen_streamprocessname +%% Tests for gen_processname %% ----------------------- -gen_streamprocessname_test() -> - ?assert(gen_streamprocessname(abcd) == mold_stream_abcd). - -%% ----------------------- -%% Tests for gen_recoveryprocessname -%% ----------------------- -gen_recoveryprocessname_test() -> - ?assert(gen_recoveryprocessname(abcd) == mold_recovery_abcd). +gen_processname_test() -> + ?assert(gen_processname(stream, abcd) == mold_stream_abcd), + ?assert(gen_processname(recovery, abcd) == mold_recovery_abcd). -endif. diff --git a/test/molderl_integration_tests.erl b/test/molderl_integration_tests.erl index 2e63c85..1f30f82 100644 --- a/test/molderl_integration_tests.erl +++ b/test/molderl_integration_tests.erl @@ -89,7 +89,7 @@ loop(State=#state{sent=Sent, inflight=Inflight, max_seq_num_rcvd=MaxSeqNumRcvd}, send(State=#state{stream=Stream, sent=Sent}) -> % generate random payload of random size < 10 bytes - Msg = crypto:strong_rand_bytes(random:uniform(10)), + Msg = crypto:strong_rand_bytes(random:uniform(10)), case molderl:send_message(Stream#stream.process_name, Msg) of ok -> Fmt = "[SUCCESS] Sent packet seq num: ~p, msg: ~p", From 8a25d456b3026839cf5d0d6cc41100a3d676d0f0 Mon Sep 17 00:00:00 2001 From: Ethan Yang Date: Tue, 24 Nov 2015 15:57:39 +0800 Subject: [PATCH 3/4] put gen_processname call into molderl:handle_call(create_stream...}) --- src/molderl.erl | 7 ++++--- src/molderl_recovery.erl | 2 +- src/molderl_stream.erl | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/molderl.erl b/src/molderl.erl index 58b4644..bda1120 100755 --- a/src/molderl.erl +++ b/src/molderl.erl @@ -59,8 +59,8 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo Timer = proplists:get_value(timer, Options, 50), TTL = proplists:get_value(multicast_ttl, Options, 1), MaxRecoveryCount = proplists:get_value(max_recovery_count, Options, 2000), - Arguments = [{streamname, StreamName}, {destination, Destination}, - {destinationport, DestinationPort}, {recoveryport, RecoveryPort}, + Arguments = [{streamname, StreamName}, {streamprocessname, molderl_utils:gen_processname(stream, StreamName)}, + {destination, Destination}, {destinationport, DestinationPort}, {recoveryport, RecoveryPort}, {ipaddresstosendfrom, IPAddressToSendFrom}, {filename, FileName}, {timer, Timer}, {multicast_ttl, TTL}, {max_recovery_count, MaxRecoveryCount}], Spec = ?CHILD(make_ref(), molderl_stream_sup, [Arguments], transient, supervisor), @@ -73,7 +73,8 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo {registered_name, ProcessName} = process_info(StreamPid, registered_name), % start recovery process - RecoveryArguments = [{mold_stream, ProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]], + RecoveryProcessName = molderl_utils:gen_processname(recovery, StreamName), + RecoveryArguments = [{mold_stream, ProcessName}|[{recoveryprocessname, RecoveryProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]]], RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [RecoveryArguments], transient, worker), {ok, _RecoveryProcess} = supervisor:start_child(Pid, RecoverySpec), diff --git a/src/molderl_recovery.erl b/src/molderl_recovery.erl index 624d328..6349044 100644 --- a/src/molderl_recovery.erl +++ b/src/molderl_recovery.erl @@ -37,6 +37,7 @@ store(ProcessName, Msgs, MsgsSize, NumMsgs) -> init(Arguments) -> {streamname, StreamName} = lists:keyfind(streamname, 1, Arguments), + {recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments), {recoveryport, RecoveryPort} = lists:keyfind(recoveryport, 1, Arguments), {packetsize, PacketSize} = lists:keyfind(packetsize, 1, Arguments), {filename, FileName} = lists:keyfind(filename, 1, Arguments), @@ -56,7 +57,6 @@ init(Arguments) -> statsd_latency_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.latency", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.received"}, - RecoveryProcessName = molderl_utils:gen_processname(recovery, StreamName), register(RecoveryProcessName, self()), lager:info("[molderl] Register molderl_recovery pid[~p] with name[~p]", [self(), RecoveryProcessName]), {ok, State}. diff --git a/src/molderl_stream.erl b/src/molderl_stream.erl index 95b432a..f057ce1 100755 --- a/src/molderl_stream.erl +++ b/src/molderl_stream.erl @@ -49,6 +49,7 @@ set_sequence_number(ProcessName, SeqNum) -> init(Arguments) -> {streamname, StreamName} = lists:keyfind(streamname, 1, Arguments), + {streamprocessname, StreamProcessName} = lists:keyfind(streamprocessname, 1, Arguments), {destination, Destination} = lists:keyfind(destination, 1, Arguments), {destinationport, DestinationPort} = lists:keyfind(destinationport, 1, Arguments), {ipaddresstosendfrom, IPAddressToSendFrom} = lists:keyfind(ipaddresstosendfrom, 1, Arguments), @@ -77,9 +78,8 @@ init(Arguments) -> statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent", recovery_service = molderl_utils:gen_processname(recovery, StreamName)}, State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)}, - ProcessName = molderl_utils:gen_processname(stream, StreamName), - register(ProcessName, self()), - lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), ProcessName]), + register(StreamProcessName, self()), + lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), StreamProcessName]), {ok, {Info, State}}; {error, Reason} -> lager:error("[molderl] Unable to open UDP socket on ~p because '~p'. Aborting.", From 86376b14b6b41998ed2340cd455bbb2769a03a23 Mon Sep 17 00:00:00 2001 From: Ethan Yang Date: Tue, 24 Nov 2015 16:42:14 +0800 Subject: [PATCH 4/4] pass recovery_process_name as argument to molderl_stream:init --- src/molderl.erl | 4 ++-- src/molderl_stream.erl | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/molderl.erl b/src/molderl.erl index bda1120..9c5a048 100755 --- a/src/molderl.erl +++ b/src/molderl.erl @@ -60,6 +60,7 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo TTL = proplists:get_value(multicast_ttl, Options, 1), MaxRecoveryCount = proplists:get_value(max_recovery_count, Options, 2000), Arguments = [{streamname, StreamName}, {streamprocessname, molderl_utils:gen_processname(stream, StreamName)}, + {recoveryprocessname, molderl_utils:gen_processname(recovery, StreamName)}, {destination, Destination}, {destinationport, DestinationPort}, {recoveryport, RecoveryPort}, {ipaddresstosendfrom, IPAddressToSendFrom}, {filename, FileName}, {timer, Timer}, {multicast_ttl, TTL}, {max_recovery_count, MaxRecoveryCount}], @@ -73,8 +74,7 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo {registered_name, ProcessName} = process_info(StreamPid, registered_name), % start recovery process - RecoveryProcessName = molderl_utils:gen_processname(recovery, StreamName), - RecoveryArguments = [{mold_stream, ProcessName}|[{recoveryprocessname, RecoveryProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]]], + RecoveryArguments = [{mold_stream, ProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]], RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [RecoveryArguments], transient, worker), {ok, _RecoveryProcess} = supervisor:start_child(Pid, RecoverySpec), diff --git a/src/molderl_stream.erl b/src/molderl_stream.erl index f057ce1..1573b91 100755 --- a/src/molderl_stream.erl +++ b/src/molderl_stream.erl @@ -50,6 +50,7 @@ init(Arguments) -> {streamname, StreamName} = lists:keyfind(streamname, 1, Arguments), {streamprocessname, StreamProcessName} = lists:keyfind(streamprocessname, 1, Arguments), + {recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments), {destination, Destination} = lists:keyfind(destination, 1, Arguments), {destinationport, DestinationPort} = lists:keyfind(destinationport, 1, Arguments), {ipaddresstosendfrom, IPAddressToSendFrom} = lists:keyfind(ipaddresstosendfrom, 1, Arguments), @@ -76,7 +77,7 @@ init(Arguments) -> statsd_latency_key_in = "molderl." ++ atom_to_list(StreamName) ++ ".time_in", statsd_latency_key_out = "molderl." ++ atom_to_list(StreamName) ++ ".time_out", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent", - recovery_service = molderl_utils:gen_processname(recovery, StreamName)}, + recovery_service = RecoveryProcessName}, State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)}, register(StreamProcessName, self()), lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), StreamProcessName]),