Skip to content

Commit

Permalink
Merge pull request #52 from LokiYang/develop
Browse files Browse the repository at this point in the history
return process name(atom) rather than pid when creating mold stream.
  • Loading branch information
pmembrey committed Nov 24, 2015
2 parents 5693013 + 86376b1 commit 6c15bff
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 36 deletions.
20 changes: 14 additions & 6 deletions src/molderl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ start_link(SupervisorPID) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, SupervisorPID, []).

-spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number())
-> {'ok', pid()} | {'error', atom()}.
-> {'ok', atom()} | {'error', atom()}.
create_stream(StreamName,Destination,DestinationPort,RecoveryPort) ->
create_stream(StreamName,Destination,DestinationPort,RecoveryPort, []).

-spec create_stream(atom(), inet:ip4_address(), inet:port_number(), inet:port_number(), [{atom(), term()}])
-> {'ok', pid()} | {'error', atom()}.
-> {'ok', atom()} | {'error', atom()}.
create_stream(StreamName, Destination, DestinationPort, RecoveryPort, Options) ->
gen_server:call(?MODULE, {create_stream, StreamName, Destination, DestinationPort, RecoveryPort, Options}).

-spec send_message(pid(), binary()) -> 'ok'.
-spec send_message(atom(), binary()) -> 'ok'.
send_message(Stream, Message) ->
gen_server:cast(?MODULE, {send, Stream, Message, os:timestamp()}).

Expand Down Expand Up @@ -59,8 +59,9 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo
Timer = proplists:get_value(timer, Options, 50),
TTL = proplists:get_value(multicast_ttl, Options, 1),
MaxRecoveryCount = proplists:get_value(max_recovery_count, Options, 2000),
Arguments = [{streamname, StreamName}, {destination, Destination},
{destinationport, DestinationPort}, {recoveryport, RecoveryPort},
Arguments = [{streamname, StreamName}, {streamprocessname, molderl_utils:gen_processname(stream, StreamName)},
{recoveryprocessname, molderl_utils:gen_processname(recovery, StreamName)},
{destination, Destination}, {destinationport, DestinationPort}, {recoveryport, RecoveryPort},
{ipaddresstosendfrom, IPAddressToSendFrom}, {filename, FileName},
{timer, Timer}, {multicast_ttl, TTL}, {max_recovery_count, MaxRecoveryCount}],
Spec = ?CHILD(make_ref(), molderl_stream_sup, [Arguments], transient, supervisor),
Expand All @@ -70,7 +71,14 @@ handle_call({create_stream, StreamName, Destination, DestinationPort, RecoveryPo
Stream = #stream{destination_addr={Destination,DestinationPort},
recovery_port=RecoveryPort,
filename=FileName},
{reply, {ok, StreamPid}, State#state{streams=[Stream|State#state.streams]}};
{registered_name, ProcessName} = process_info(StreamPid, registered_name),

% start recovery process
RecoveryArguments = [{mold_stream, ProcessName}|[{packetsize, ?PACKET_SIZE}|Arguments]],
RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [RecoveryArguments], transient, worker),
{ok, _RecoveryProcess} = supervisor:start_child(Pid, RecoverySpec),

{reply, {ok, ProcessName}, State#state{streams=[Stream|State#state.streams]}};
{error, Error} ->
{reply, {error, Error}, State}
end;
Expand Down
17 changes: 10 additions & 7 deletions src/molderl_recovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,23 @@
start_link(Arguments) ->
gen_server:start_link(?MODULE, Arguments, []).

-spec store(pid(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok.
store(Pid, Msgs, MsgsSize, NumMsgs) ->
gen_server:cast(Pid, {store, Msgs, MsgsSize, NumMsgs}).
-spec store(atom(), [binary()], [non_neg_integer()], non_neg_integer()) -> ok.
store(ProcessName, Msgs, MsgsSize, NumMsgs) ->
gen_server:cast(ProcessName, {store, Msgs, MsgsSize, NumMsgs}).

init(Arguments) ->

{streamname, StreamName} = lists:keyfind(streamname, 1, Arguments),
{recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments),
{recoveryport, RecoveryPort} = lists:keyfind(recoveryport, 1, Arguments),
{packetsize, PacketSize} = lists:keyfind(packetsize, 1, Arguments),
{filename, FileName} = lists:keyfind(filename, 1, Arguments),
{mold_stream, MoldStreamPid} = lists:keyfind(mold_stream, 1, Arguments),
{mold_stream, MoldStreamProcessName} = lists:keyfind(mold_stream, 1, Arguments),
{max_recovery_count, MaxRecoveryCount} = lists:keyfind(max_recovery_count, 1, Arguments),

process_flag(trap_exit, true), % so that terminate/2 gets called when process exits

self() ! {initialize, FileName, MoldStreamPid},
self() ! {initialize, FileName, MoldStreamProcessName},

{ok, Socket} = gen_udp:open(RecoveryPort, [binary, {active,once}, {reuseaddr, true}]),

Expand All @@ -56,6 +57,8 @@ init(Arguments) ->
statsd_latency_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.latency",
statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".recovery_request.received"},

register(RecoveryProcessName, self()),
lager:info("[molderl] Register molderl_recovery pid[~p] with name[~p]", [self(), RecoveryProcessName]),
{ok, State}.

handle_cast({store, Msgs, MsgsSize, NumMsgs}, State) ->
Expand Down Expand Up @@ -104,7 +107,7 @@ handle_info({udp, _Client, IP, Port, IllFormedRequest}, State) ->
ok = inet:setopts(State#state.socket, [{active, once}]),
{noreply, State};

handle_info({initialize, FileName, MoldStreamPid}, State) ->
handle_info({initialize, FileName, MoldStreamProcessName}, State) ->
case file:open(FileName, [read, append, raw, binary, read_ahead]) of
{ok, IoDevice} ->
Log = "[molderl] Rebuilding MOLDUDP64 index from disk cache ~p. This may take some time.",
Expand All @@ -114,7 +117,7 @@ handle_info({initialize, FileName, MoldStreamPid}, State) ->
SeqNum = length(Index),
Fmt = "[molderl] Successfully restored ~p MOLD packets from file ~p",
lager:info(Fmt, [SeqNum, FileName]),
ok = molderl_stream:set_sequence_number(MoldStreamPid, SeqNum+1),
ok = molderl_stream:set_sequence_number(MoldStreamProcessName, SeqNum+1),
NewState = State#state{last_seq_num=SeqNum,
blocks_store=IoDevice,
store_size=FileSize,
Expand Down
31 changes: 13 additions & 18 deletions src/molderl_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
socket :: inet:socket(), % The socket to send the data on
destination_port :: inet:port_number(), % Destination port for the data
% to be encoded in a MOLD64 packet
recovery_service :: pid() , % Pid of the recovery service message
recovery_service :: atom() , % name of the recovery service message
prod_interval :: pos_integer(), % Maximum interval at which either partial packets
% or heartbeats should be sent
statsd_latency_key_in :: string(), %
Expand All @@ -38,17 +38,19 @@
start_link(Arguments) ->
gen_server:start_link(?MODULE, Arguments, []).

-spec send(pid(), binary(), erlang:timestamp()) -> 'ok'.
send(Pid, Message, StartTime) ->
gen_server:cast(Pid, {send, Message, StartTime}).
-spec send(atom(), binary(), erlang:timestamp()) -> 'ok'.
send(ProcessName, Message, StartTime) ->
gen_server:cast(ProcessName, {send, Message, StartTime}).

-spec set_sequence_number(pid(), pos_integer()) -> 'ok'.
set_sequence_number(Pid, SeqNum) ->
gen_server:cast(Pid, {sequence_number, SeqNum}).
-spec set_sequence_number(atom(), pos_integer()) -> 'ok'.
set_sequence_number(ProcessName, SeqNum) ->
gen_server:cast(ProcessName, {sequence_number, SeqNum}).

init(Arguments) ->

{streamname, StreamName} = lists:keyfind(streamname, 1, Arguments),
{streamprocessname, StreamProcessName} = lists:keyfind(streamprocessname, 1, Arguments),
{recoveryprocessname, RecoveryProcessName} = lists:keyfind(recoveryprocessname, 1, Arguments),
{destination, Destination} = lists:keyfind(destination, 1, Arguments),
{destinationport, DestinationPort} = lists:keyfind(destinationport, 1, Arguments),
{ipaddresstosendfrom, IPAddressToSendFrom} = lists:keyfind(ipaddresstosendfrom, 1, Arguments),
Expand All @@ -57,10 +59,6 @@ init(Arguments) ->

process_flag(trap_exit, true), % so that terminate/2 gets called when process exits

% send yourself a reminder to start recovery process
RecoveryArguments = [{mold_stream, self()}|[{packetsize, ?PACKET_SIZE}|Arguments]],
self() ! {initialize, RecoveryArguments},

Connection = gen_udp:open(0, [binary,
{broadcast, true},
{ip, IPAddressToSendFrom},
Expand All @@ -78,8 +76,11 @@ init(Arguments) ->
prod_interval = ProdInterval,
statsd_latency_key_in = "molderl." ++ atom_to_list(StreamName) ++ ".time_in",
statsd_latency_key_out = "molderl." ++ atom_to_list(StreamName) ++ ".time_out",
statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent"},
statsd_count_key = "molderl." ++ atom_to_list(StreamName) ++ ".packets_sent",
recovery_service = RecoveryProcessName},
State = #state{timer_ref = erlang:send_after(ProdInterval, self(), prod)},
register(StreamProcessName, self()),
lager:info("[molderl] Register molderl_stream pid[~p] with name[~p]", [self(), StreamProcessName]),
{ok, {Info, State}};
{error, Reason} ->
lager:error("[molderl] Unable to open UDP socket on ~p because '~p'. Aborting.",
Expand Down Expand Up @@ -142,12 +143,6 @@ handle_info(prod, {Info, OldState=#state{packets=Pckts, messages=Msgs}}) ->
{stop, Reason, {Info, State}}
end;

handle_info({initialize, Arguments}, {Info, State}) ->
{supervisorpid, SupervisorPID} = lists:keyfind(supervisorpid, 1, Arguments),
RecoverySpec = ?CHILD(make_ref(), molderl_recovery, [Arguments], transient, worker),
{ok, RecoveryProcess} = supervisor:start_child(SupervisorPID, RecoverySpec),
{noreply, {Info#info{recovery_service=RecoveryProcess}, State}};

handle_info(Info, State) ->
lager:error("[molderl] molderl_stream:handle_info received unexpected message. Info:~p, State:~p.~n", [Info, State]),
{noreply, State}.
Expand Down
13 changes: 12 additions & 1 deletion src/molderl_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
gen_endofsession/2,
gen_streamname/1,
encode_messages/1,
gen_messagepacket/4]).
gen_messagepacket/4,
gen_processname/2]).

-include("molderl.hrl").

Expand Down Expand Up @@ -45,6 +46,10 @@ gen_streamname(StreamName) when is_list(StreamName), length(StreamName) > 10 ->
gen_streamname(StreamName) when is_list(StreamName) ->
binary_padder(list_to_binary(StreamName)).

-spec gen_processname(atom(), atom()) -> atom().
gen_processname(ProcessType, StreamName) ->
list_to_atom(atom_to_list(mold_) ++ atom_to_list(ProcessType) ++ "_" ++ atom_to_list(StreamName)).

%% --------------------------------------------
%% Takes a binary and pads it out to ten bytes.
%% This is needed by the Stream Name.
Expand Down Expand Up @@ -147,5 +152,11 @@ gen_messagepacket_empty_test() ->
Expected = <<<<"foo">>/binary, 23:64, 0:16>>,
?assertEqual(Expected, Observed).

%% -----------------------
%% Tests for gen_processname
%% -----------------------
gen_processname_test() ->
?assert(gen_processname(stream, abcd) == mold_stream_abcd),
?assert(gen_processname(recovery, abcd) == mold_recovery_abcd).
-endif.

8 changes: 4 additions & 4 deletions test/molderl_integration_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

-compile([{parse_transform, lager_transform}]).

-record(stream, {pid :: pid(),
-record(stream, {process_name:: atom(),
name :: string(),
ip :: inet:ip4_address(),
port :: inet:port_number(),
Expand Down Expand Up @@ -90,7 +90,7 @@ loop(State=#state{sent=Sent, inflight=Inflight, max_seq_num_rcvd=MaxSeqNumRcvd},
send(State=#state{stream=Stream, sent=Sent}) ->
% generate random payload of random size < 10 bytes
Msg = crypto:strong_rand_bytes(random:uniform(10)),
case molderl:send_message(Stream#stream.pid, Msg) of
case molderl:send_message(Stream#stream.process_name, Msg) of
ok ->
Fmt = "[SUCCESS] Sent packet seq num: ~p, msg: ~p",
Outcome = io_lib:format(Fmt, [length(Sent)+1, Msg]),
Expand Down Expand Up @@ -163,8 +163,8 @@ crash(State) ->

launch_stream(Stream=#stream{port=P, recovery_port=RP, file=F, ip=IP}) ->
ok = application:start(molderl),
{ok, Pid} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]),
Stream#stream{pid=Pid}.
{ok, ProcessName} = molderl:create_stream(foo, ?MCAST_GROUP_IP, P, RP, [{ipaddresstosendfrom,IP},{filename,F}]),
Stream#stream{process_name=ProcessName}.

clean_up() ->
application:stop(molderl).
Expand Down

0 comments on commit 6c15bff

Please sign in to comment.