diff --git a/src/molderl.erl b/src/molderl.erl index 6518821..9c5a048 100755 --- a/src/molderl.erl +++ b/src/molderl.erl @@ -22,16 +22,16 @@ start_link(SupervisorPID) -> gen_server:start_link({local, ?MODULE}, ?MODULE, SupervisorPID, []). -spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number()) - -> {'ok', pid()} | {'error', atom()}. + -> {'ok', atom()} | {'error', atom()}. create_stream(StreamName,Destination,DestinationPort,RecoveryPort) -> create_stream(StreamName,Destination,DestinationPort,RecoveryPort, []). -spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number(), [{atom(), term()}]) - -> {'ok', pid()} | {'error', atom()}. + -> {'ok', atom()} | {'error', atom()}. create_stream(StreamName, Destination, DestinationPort, RecoveryPort, Options) -> gen_server:call(?MODULE, {create_stream, StreamName, Destination, DestinationPort, RecoveryPort, Options}). --spec send_message(pid(), binary()) -> 'ok'. +-spec send_message(atom(), binary()) -> 'ok'. send_message(Stream, Message) -> gen_server:cast(?MODULE, {send, Stream, Message, os:timestamp()}). @@ -59,8 +59,9 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo Timer = proplists:get_value(timer, Options, 50), TTL = proplists:get_value(multicast_ttl, Options, 1), MaxRecoveryCount = proplists:get_value(max_recovery_count, Options, 2000), - Arguments = [{streamname, StreamName}, {destination, Destination}, - {destinationport, DestinationPort}, {recoveryport, RecoveryPort}, + Arguments = [{streamname, StreamName}, {streamprocessname, molderl_utils:gen_processname(stream, StreamName)}, + {recoveryprocessname, molderl_utils:gen_processname(recovery, StreamName)}, + {destination, Destination}, {destinationport, DestinationPort}, {recoveryport, RecoveryPort}, {ipaddresstosendfrom, IPAddressToSendFrom}, {filename, FileName}, {timer, Timer}, {multicast_ttl, TTL}, {max_recovery_count, MaxRecoveryCount}], Spec = ?CHILD(make_ref(), molderl_stream_sup, [Arguments], transient, supervisor), @@ -70,7 +71,14 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo Stream = #stream{destination_addr={Destination,DestinationPort}, recovery_port=RecoveryPort, filename=FileName}, - {reply, {ok, StreamPid}, State#state{streams=[Stream|State#state.streams]}}; + {registered_name, ProcessName} = process_info(StreamPid, registered_name), + + % start recovery process + RecoveryArguments = [{mold_stream, ProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]], + RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [RecoveryArguments], transient, worker), + {ok, _RecoveryProcess} = supervisor:start_child(Pid, RecoverySpec), + + {reply, {ok, ProcessName}, State#state{streams=[Stream|State#state.streams]}}; {error, Error} -> {reply, {error, Error}, State} end; diff --git a/src/molderl_recovery.erl b/src/molderl_recovery.erl index e438fce..6349044 100644 --- a/src/molderl_recovery.erl +++ b/src/molderl_recovery.erl @@ -30,22 +30,23 @@ start_link(Arguments) -> gen_server:start_link(?MODULE, Arguments, []). --spec store(pid(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok. -store(Pid, Msgs, MsgsSize, NumMsgs) -> - gen_server:cast(Pid, {store, Msgs, MsgsSize, NumMsgs}). +-spec store(atom(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok. +store(ProcessName, Msgs, MsgsSize, NumMsgs) -> + gen_server:cast(ProcessName, {store, Msgs, MsgsSize, NumMsgs}). init(Arguments) -> {streamname, StreamName} = lists:keyfind(streamname, 1, Arguments), + {recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments), {recoveryport, RecoveryPort} = lists:keyfind(recoveryport, 1, Arguments), {packetsize, PacketSize} = lists:keyfind(packetsize, 1, Arguments), {filename, FileName} = lists:keyfind(filename, 1, Arguments), - {mold_stream, MoldStreamPid} = lists:keyfind(mold_stream, 1, Arguments), + {mold_stream, MoldStreamProcessName} = lists:keyfind(mold_stream, 1, Arguments), {max_recovery_count, MaxRecoveryCount} = lists:keyfind(max_recovery_count, 1, Arguments), process_flag(trap_exit, true), % so that terminate/2 gets called when process exits - self() ! {initialize, FileName, MoldStreamPid}, + self() ! {initialize, FileName, MoldStreamProcessName}, {ok, Socket} = gen_udp:open(RecoveryPort, [binary, {active,once}, {reuseaddr, true}]), @@ -56,6 +57,8 @@ init(Arguments) -> statsd_latency_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.latency", statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.received"}, + register(RecoveryProcessName, self()), + lager:info("[molderl] Register molderl_recovery pid[~p] with name[~p]", [self(), RecoveryProcessName]), {ok, State}. handle_cast({store, Msgs, MsgsSize, NumMsgs}, State) -> @@ -104,7 +107,7 @@ handle_info({udp, _Client, IP, Port, IllFormedRequest}, State) -> ok = inet:setopts(State#state.socket, [{active, once}]), {noreply, State}; -handle_info({initialize, FileName, MoldStreamPid}, State) -> +handle_info({initialize, FileName, MoldStreamProcessName}, State) -> case file:open(FileName, [read, append, raw, binary, read_ahead]) of {ok, IoDevice} -> Log = "[molderl] Rebuilding MOLDUDP64 index from disk cache ~p. This may take some time.", @@ -114,7 +117,7 @@ handle_info({initialize, FileName, MoldStreamPid}, State) -> SeqNum = length(Index), Fmt = "[molderl] Successfully restored ~p MOLD packets from file ~p", lager:info(Fmt, [SeqNum, FileName]), - ok = molderl_stream:set_sequence_number(MoldStreamPid, SeqNum+1), + ok = molderl_stream:set_sequence_number(MoldStreamProcessName, SeqNum+1), NewState = State#state{last_seq_num=SeqNum, blocks_store=IoDevice, store_size=FileSize, diff --git a/src/molderl_stream.erl b/src/molderl_stream.erl index cb9008b..1573b91 100755 --- a/src/molderl_stream.erl +++ b/src/molderl_stream.erl @@ -19,7 +19,7 @@ socket :: inet:socket(), % The socket to send the data on destination_port :: inet:port_number(), % Destination port for the data % to be encoded in a MOLD64 packet - recovery_service :: pid() , % Pid of the recovery service message + recovery_service :: atom() , % name of the recovery service message prod_interval :: pos_integer(), % Maximum interval at which either partial packets % or heartbeats should be sent statsd_latency_key_in :: string(), % @@ -38,17 +38,19 @@ start_link(Arguments) -> gen_server:start_link(?MODULE, Arguments, []). --spec send(pid(), binary(), erlang:timestamp()) -> 'ok'. -send(Pid, Message, StartTime) -> - gen_server:cast(Pid, {send, Message, StartTime}). +-spec send(atom(), binary(), erlang:timestamp()) -> 'ok'. +send(ProcessName, Message, StartTime) -> + gen_server:cast(ProcessName, {send, Message, StartTime}). --spec set_sequence_number(pid(), pos_integer()) -> 'ok'. -set_sequence_number(Pid, SeqNum) -> - gen_server:cast(Pid, {sequence_number, SeqNum}). +-spec set_sequence_number(atom(), pos_integer()) -> 'ok'. +set_sequence_number(ProcessName, SeqNum) -> + gen_server:cast(ProcessName, {sequence_number, SeqNum}). init(Arguments) -> {streamname, StreamName} = lists:keyfind(streamname, 1, Arguments), + {streamprocessname, StreamProcessName} = lists:keyfind(streamprocessname, 1, Arguments), + {recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments), {destination, Destination} = lists:keyfind(destination, 1, Arguments), {destinationport, DestinationPort} = lists:keyfind(destinationport, 1, Arguments), {ipaddresstosendfrom, IPAddressToSendFrom} = lists:keyfind(ipaddresstosendfrom, 1, Arguments), @@ -57,10 +59,6 @@ init(Arguments) -> process_flag(trap_exit, true), % so that terminate/2 gets called when process exits - % send yourself a reminder to start recovery process - RecoveryArguments = [{mold_stream, self()}|[{packetsize, ?PACKET_SIZE}|Arguments]], - self() ! {initialize, RecoveryArguments}, - Connection = gen_udp:open(0, [binary, {broadcast, true}, {ip, IPAddressToSendFrom}, @@ -78,8 +76,11 @@ init(Arguments) -> prod_interval = ProdInterval, statsd_latency_key_in = "molderl." ++ atom_to_list(StreamName) ++ ".time_in", statsd_latency_key_out = "molderl." ++ atom_to_list(StreamName) ++ ".time_out", - statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent"}, + statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent", + recovery_service = RecoveryProcessName}, State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)}, + register(StreamProcessName, self()), + lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), StreamProcessName]), {ok, {Info, State}}; {error, Reason} -> lager:error("[molderl] Unable to open UDP socket on ~p because '~p'. Aborting.", @@ -142,12 +143,6 @@ handle_info(prod, {Info, OldState=#state{packets=Pckts, messages=Msgs}}) -> {stop, Reason, {Info, State}} end; -handle_info({initialize, Arguments}, {Info, State}) -> - {supervisorpid, SupervisorPID} = lists:keyfind(supervisorpid, 1, Arguments), - RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [Arguments], transient, worker), - {ok, RecoveryProcess} = supervisor:start_child(SupervisorPID, RecoverySpec), - {noreply, {Info#info{recovery_service=RecoveryProcess}, State}}; - handle_info(Info, State) -> lager:error("[molderl] molderl_stream:handle_info received unexpected message. Info:~p, State:~p.~n", [Info, State]), {noreply, State}. diff --git a/src/molderl_utils.erl b/src/molderl_utils.erl index 5153199..f4b02ef 100755 --- a/src/molderl_utils.erl +++ b/src/molderl_utils.erl @@ -8,7 +8,8 @@ gen_endofsession/2, gen_streamname/1, encode_messages/1, - gen_messagepacket/4]). + gen_messagepacket/4, + gen_processname/2]). -include("molderl.hrl"). @@ -45,6 +46,10 @@ gen_streamname(StreamName) when is_list(StreamName), length(StreamName) > 10 -> gen_streamname(StreamName) when is_list(StreamName) -> binary_padder(list_to_binary(StreamName)). +-spec gen_processname(atom(), atom()) -> atom(). +gen_processname(ProcessType, StreamName) -> + list_to_atom(atom_to_list(mold_) ++ atom_to_list(ProcessType) ++ "_" ++ atom_to_list(StreamName)). + %% -------------------------------------------- %% Takes a binary and pads it out to ten bytes. %% This is needed by the Stream Name. @@ -147,5 +152,11 @@ gen_messagepacket_empty_test() -> Expected = <<<<"foo">>/binary, 23:64, 0:16>>, ?assertEqual(Expected, Observed). +%% ----------------------- +%% Tests for gen_processname +%% ----------------------- +gen_processname_test() -> + ?assert(gen_processname(stream, abcd) == mold_stream_abcd), + ?assert(gen_processname(recovery, abcd) == mold_recovery_abcd). -endif. diff --git a/test/molderl_integration_tests.erl b/test/molderl_integration_tests.erl index 8687cb8..1f30f82 100644 --- a/test/molderl_integration_tests.erl +++ b/test/molderl_integration_tests.erl @@ -11,7 +11,7 @@ -compile([{parse_transform, lager_transform}]). --record(stream, {pid :: pid(), +-record(stream, {process_name:: atom(), name :: string(), ip :: inet:ip4_address(), port :: inet:port_number(), @@ -90,7 +90,7 @@ loop(State=#state{sent=Sent, inflight=Inflight, max_seq_num_rcvd=MaxSeqNumRcvd}, send(State=#state{stream=Stream, sent=Sent}) -> % generate random payload of random size < 10 bytes Msg = crypto:strong_rand_bytes(random:uniform(10)), - case molderl:send_message(Stream#stream.pid, Msg) of + case molderl:send_message(Stream#stream.process_name, Msg) of ok -> Fmt = "[SUCCESS] Sent packet seq num: ~p, msg: ~p", Outcome = io_lib:format(Fmt, [length(Sent)+1, Msg]), @@ -163,8 +163,8 @@ crash(State) -> launch_stream(Stream=#stream{port=P, recovery_port=RP, file=F, ip=IP}) -> ok = application:start(molderl), - {ok, Pid} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]), - Stream#stream{pid=Pid}. + {ok, ProcessName} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]), + Stream#stream{process_name=ProcessName}. clean_up() -> application:stop(molderl).