Skip to content

Commit

Permalink
significantly improved performance of deserializing perf messages
Browse files Browse the repository at this point in the history
  • Loading branch information
djnym committed Feb 27, 2016
1 parent 464b557 commit 77d7fac
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 27 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 5.8.0 (molinaro)
* significantly improved performance when deserializing performance events
* improved coverage of mondemand_perfmsg module

Version 5.7.1 (molinaro)
* minor dialyzer issue

Expand Down
2 changes: 1 addition & 1 deletion src/mondemand.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{ application, mondemand,
[
{ description, "Erlang Mondemand Bindings." },
{ vsn, "5.7.1" },
{ vsn, "5.8.0" },
{ modules, [] },
{ registered, [mondemand,mondemand_sup]},
{ applications, [kernel,stdlib,syntax_tools,lwes]},
Expand Down
10 changes: 8 additions & 2 deletions src/mondemand_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,16 @@ from_udp (Packet = {udp, _, SenderIp, SenderPort, _}) ->
sender_port = SenderPort,
name = ?MD_TRACE_EVENT,
msg = lwes_event:from_udp_packet (Packet, json_eep18) };
?MD_PERF_EVENT ->
Event = lwes_event:from_udp_packet (Packet, list),
{ReceiptTime, Msg} = mondemand_perfmsg:from_lwes (Event),
#md_event { sender_ip = SenderIp,
sender_port = SenderPort,
receipt_time = ReceiptTime,
name = ?MD_PERF_EVENT,
msg = Msg };
Name when Name =:= ?MD_STATS_EVENT;
Name =:= ?MD_LOG_EVENT;
Name =:= ?MD_PERF_EVENT;
Name =:= ?MD_ANNOTATION_EVENT ->
% deserialize the event as a dictionary
Event = #lwes_event { attrs = Data }
Expand All @@ -107,7 +114,6 @@ from_udp (Packet = {udp, _, SenderIp, SenderPort, _}) ->
case Name of
?MD_STATS_EVENT -> mondemand_statsmsg:from_lwes (Event);
?MD_LOG_EVENT -> mondemand_logmsg:from_lwes (Event);
?MD_PERF_EVENT -> mondemand_perfmsg:from_lwes (Event);
?MD_ANNOTATION_EVENT -> mondemand_annotationmsg:from_lwes (Event)
end,
#md_event { sender_ip = SenderIp,
Expand Down
317 changes: 293 additions & 24 deletions src/mondemand_perfmsg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,6 @@ context_find (Key, Context, Default) ->
{_, H} -> H
end.

timings_from_lwes (Data) ->
Num = mondemand_util:find_in_dict (?MD_NUM, Data, 0),
{ Num,
lists:map (fun(N) ->
timing_from_lwes (N, Data)
end,
lists:seq (1, Num))
}.

timing_from_lwes (TimingIndex, Data) ->
L = dict:fetch (perf_label_key (TimingIndex), Data),
S = dict:fetch (perf_start_key (TimingIndex), Data),
E = dict:fetch (perf_end_key (TimingIndex), Data),
#md_perf_timing { label = L, start_time = S, end_time = E }.

timings_to_lwes (NumTimings, Timings) ->
[ { ?LWES_U_INT_16, ?MD_NUM, NumTimings }
| lists:zipwith (fun timing_to_lwes/2,
Expand Down Expand Up @@ -121,16 +106,168 @@ to_lwes (#md_perf_msg { id = Id,
])
}.


% deserializing was originally done by using the dict form of lwes deserializing
% and lots of dict:fetch/2 calls. This proved to be very slow mostly because
% of dict calls. So I switched to deserializing as a list, and walking the list
% once. This requires some sorting and zipping afterwards but is about 5x
% faster. When walking the list, I originally used the accum record below
% but that proved to be slow because of erlang:setelement/2, so I expanded
% out all the accumulators so that process has 11 args, but it's faster
% still.
-record(accum, { receipt_time,
id,
caller_label,
timing_num = 0,
labels = [],
starts = [],
ends = [],
context_num = 0,
context_keys = [],
context_vals = []
}).

from_lwes (#lwes_event { attrs = Data}) ->
{NumTimings, Timings} = timings_from_lwes (Data),
{_, NumContexts, Context} = mondemand_util:context_from_lwes (Data),
#md_perf_msg { id = dict:fetch (?MD_PERF_ID, Data),
caller_label = dict:fetch (?MD_PERF_CALLER_LABEL, Data),
num_context = NumContexts,
context = Context,
num_timings = NumTimings,
timings = Timings
}.
#accum { id = Id,
receipt_time = ReceiptTime,
caller_label = CallerLabel,
timing_num = TimingNum,
labels = Labels,
starts = Starts,
ends = Ends,
context_num = ContextNum,
context_keys = CKeys,
context_vals = CVals
} = process (Data, undefined, 0, undefined, 0, [], [], [],
0, [], []),
TimingsOut =
lists:zipwith3 (fun ({K,Label},{K,Start},{K,End}) ->
#md_perf_timing {label = Label,
start_time = Start,
end_time = End}
end,
lists:sort(Labels),
lists:sort(Starts),
lists:sort(Ends)),
TimingNum = length (TimingsOut),
ContextsOut =
lists:zipwith (fun ({K, Key},{K, Val}) ->
{Key, Val}
end,
lists:sort(CKeys),
lists:sort(CVals)),
ContextNum = length (ContextsOut),
{ ReceiptTime,
#md_perf_msg { id = Id,
caller_label = CallerLabel,
num_context = ContextNum,
context = lists:sort(ContextsOut),
num_timings = TimingNum,
timings = lists:sort(TimingsOut)
}
}.

process ([], Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals ) ->
#accum { id = Id,
receipt_time = ReceiptTime,
caller_label = CallerLabel,
timing_num = TimingNum,
labels = Labels,
starts = Starts,
ends = Ends,
context_num = ContextNum,
context_keys = CKeys,
context_vals = CVals
};
process ([{?MD_PERF_ID,I} | Rest ],
_, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
I, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals);
process ([{?MD_RECEIPT_TIME,R} | Rest ],
Id, _, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, R, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals);
process ([{?MD_PERF_CALLER_LABEL,C} | Rest ],
Id, ReceiptTime, _,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, C,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals);
process ([{?MD_NUM,N} | Rest ],
Id, ReceiptTime, CallerLabel,
_, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
N, Labels, Starts, Ends,
ContextNum, CKeys, CVals);
process ([{<<"label",N/binary>>,Label} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, [ {N,Label} | Labels ], Starts, Ends,
ContextNum, CKeys, CVals);
process ([{<<"start",N/binary>>,Start} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, [ {N,Start} | Starts], Ends,
ContextNum, CKeys, CVals);
process ([{<<"end",N/binary>>,End} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, [ {N,End} | Ends ],
ContextNum, CKeys, CVals);
process ([{?MD_CTXT_NUM,N} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
_, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
N, CKeys, CVals);
process ([{<<"ctxt_k",N/binary>>,Key} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, [ {N,Key} | CKeys ], CVals);
process ([{<<"ctxt_v",N/binary>>,Val} | Rest ],
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
process (Rest,
Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, [ {N,Val} | CVals ]);
process ([_|R], Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals) ->
% skip unrecognized
process (R, Id, ReceiptTime, CallerLabel,
TimingNum, Labels, Starts, Ends,
ContextNum, CKeys, CVals).

perf_label_key (N) ->
?ELEMENT_OF_TUPLE_LIST (N, ?MD_PERF_LABEL).
Expand All @@ -147,5 +284,137 @@ perf_end_key (N) ->
-ifdef (TEST).
-include_lib ("eunit/include/eunit.hrl").

perfmsg_test_ () ->
[
{ "basic constructor test",
fun () ->
P = new (<<"foo">>, <<"bar">>, [{<<"baz">>,5,10}]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (5, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([],context (P)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "basic constructor multi-test",
fun () ->
P = new (<<"foo">>, <<"bar">>, [{<<"baz">>,5,10},{<<"bob">>,6,8}]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (2, num_timings(P)),
[ T1, T2 ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T1)),
?assertEqual (5, timing_start_time (T1)),
?assertEqual (10, timing_end_time (T1)),
?assertEqual (<<"bob">>,timing_label (T2)),
?assertEqual (6, timing_start_time (T2)),
?assertEqual (8, timing_end_time (T2)),
?assertEqual ([],context (P)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "basic constructor with constructor timings",
fun () ->
P = new (<<"foo">>, <<"bar">>, [new_timing(<<"baz">>,5,10)]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (5, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([],context (P)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "expanded args constructor",
fun () ->
P = new (<<"foo">>, <<"bar">>, <<"baz">>,5,10),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (5, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([],context (P)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "constructor with context",
fun () ->
P = new (<<"foo">>, <<"bar">>,
[{<<"baz">>,5,10}],
[{<<"cat">>,<<"bird">>}]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (5, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([{<<"cat">>,<<"bird">>}], context (P)),
?assertEqual (<<"bird">>,context_value (P, <<"cat">>)),
?assertEqual (undefined, context_value (P, <<"dog">>)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "constructor with expanded args with context",
fun () ->
P = new (<<"foo">>, <<"bar">>, <<"baz">>,5, 10, [{<<"cat">>,<<"bird">>}]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (5, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([{<<"cat">>,<<"bird">>}], context (P)),
?assertEqual (<<"bird">>,context_value (P, <<"cat">>)),
?assertEqual (undefined, context_value (P, <<"dog">>)),
?assertEqual ({0, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(to_lwes(P)),list)))
end
},
{ "add receipttime and others for full coverage",
fun () ->
% cheating a bit here, for coverage, captured the output of os:timestamp
% and use that as the start time
P = new (<<"foo">>, <<"bar">>,
[{<<"baz">>,{1456,534959,536878},10}],
[{<<"cat">>,<<"bird">>}]),
?assertEqual (<<"foo">>, id(P)),
?assertEqual (<<"bar">>,caller_label(P)),
?assertEqual (1, num_timings(P)),
[ T ] = timings(P),
?assertEqual (<<"baz">>,timing_label (T)),
?assertEqual (1456534959536, timing_start_time (T)),
?assertEqual (10, timing_end_time (T)),
?assertEqual ([{<<"cat">>,<<"bird">>}], context (P)),
?assertEqual (<<"bird">>,context_value (P, <<"cat">>)),
?assertEqual (undefined, context_value (P, <<"dog">>)),
[Event] = to_lwes([P]), % cheating here by 'testing' the list version
NewEvent = Event#lwes_event { attrs = [ {?LWES_INT_64, ?MD_RECEIPT_TIME, 5}, {?LWES_U_INT_16, ?MD_SENDER_PORT, 10} | Event#lwes_event.attrs ]},
?assertEqual ({5, P},
from_lwes(
lwes_event:from_binary(lwes_event:to_binary(NewEvent),list)))
end
}
].

-endif.

0 comments on commit 77d7fac

Please sign in to comment.