Skip to content

Commit

Permalink
add contexts, and map to mapfoldl
Browse files Browse the repository at this point in the history
  • Loading branch information
djnym committed Mar 21, 2017
1 parent 0f635fd commit cbbcecb
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 34 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 6.8.0 (molinaro)
* add user state to the mondemand_statdb:map function so that you can do
the equivalent of mapfold there.
* alter flushing to use an initial state function
* added a convience function for adding contexts to events which support
them to mondemand_event.

Version 6.7.0 (molinaro)
* generalize flush functionality so that the server can plug in a slightly
modified version.
Expand Down
2 changes: 1 addition & 1 deletion src/mondemand.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{ application, mondemand,
[
{ description, "Erlang Mondemand Bindings." },
{ vsn, "6.7.0" },
{ vsn, "6.8.0" },
{ modules, [] },
{ registered, [mondemand,mondemand_sup]},
{ applications, [kernel,stdlib,syntax_tools,lwes,inets]},
Expand Down
30 changes: 17 additions & 13 deletions src/mondemand.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

% other functions
send_stats/3,
flush_state_init/0,
flush_one_stat/2,
reset_stats/0,
stats/0,
Expand Down Expand Up @@ -313,17 +314,16 @@ flush ({FlushModule, FlushStatePrepFunction, FlushFunction}) ->
send_stats (ProgId, Context, VmStats)
end,
% allow some initialization of state to be sent to each invocation
UserState = case FlushStatePrepFunction of
undefined -> undefined;
_ -> FlushModule:FlushStatePrepFunction()
end,
mondemand_statdb:flush (1,
fun (StatsMsg) ->
erlang:apply (FlushModule,
FlushFunction,
[UserState, StatsMsg])
end
).
Start = os:timestamp (),
Ret = mondemand_statdb:flush (1,
fun FlushModule:FlushFunction/2,
FlushModule:FlushStatePrepFunction()
),
Finish = os:timestamp (),
error_logger:info_msg ("flushing took ~p millis and returned ~p",
[webmachine_util:now_diff_milliseconds (Finish,Start),
Ret]),
Ret.

reset_stats () ->
mondemand_statdb:reset_stats().
Expand Down Expand Up @@ -480,8 +480,12 @@ send_event (Events) when is_list (Events) ->
send_event (Event = #lwes_event { name = Name }) ->
gen_server:cast (?MODULE, {send, Name, lwes_event:to_binary (Event)}).

flush_one_stat (_, StatsMsg) ->
send_event (mondemand_statsmsg:to_lwes (StatsMsg)).
flush_state_init () ->
ok.

flush_one_stat (StatsMsg, State) ->
send_event (mondemand_statsmsg:to_lwes (StatsMsg)),
State.

open_all (Config) ->
lists:foldl (fun ({T,C},{ok, D}) ->
Expand Down
41 changes: 41 additions & 0 deletions src/mondemand_annotationmsg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
tags/1,
context/1,
context_value/2,
add_contexts/2,
add_context/3,
to_lwes/1,
from_lwes/1
]).
Expand Down Expand Up @@ -47,6 +49,18 @@ context_find (Key, Context, Default) ->
{_, H} -> H
end.

add_contexts (A = #md_annotation_msg { num_context = ContextNum,
context = Context},
L) when is_list (L) ->
A#md_annotation_msg { num_context = ContextNum + length (L),
context = L ++ Context }.

add_context (A = #md_annotation_msg { num_context = ContextNum,
context = Context},
ContextKey, ContextValue) ->
A#md_annotation_msg { num_context = ContextNum + 1,
context = [ {ContextKey, ContextValue} | Context ] }.

tags_from_lwes (Data) ->
Num = mondemand_util:find_in_dict (?MD_ANNOTATION_TAG_NUM, Data, 0),
{ Num,
Expand Down Expand Up @@ -111,3 +125,30 @@ from_lwes (#lwes_event { attrs = Data}) ->
% Precompute tag keys
annotation_tag_key (N) ->
?ELEMENT_OF_TUPLE_LIST (N, ?MD_ANNOTATION_TAG).

%-=====================================================================-
%- Test Functions -
%-=====================================================================-
-ifdef (TEST).
-include_lib ("eunit/include/eunit.hrl").

annotation_test_ () ->
[
{ "basic constructor test",
fun() ->
C = [{<<"foo">>,<<"bar">>}],
T = [<<"tag1">>,<<"tag2">>],
A = new (<<"baz">>, 123456, <<"description">>, <<"text">>, T, C),
?assertEqual (<<"baz">>, id(A)),
?assertEqual (123456, timestamp(A)),
?assertEqual (<<"description">>, description(A)),
?assertEqual (<<"text">>, text(A)),
?assertEqual (T, tags(A)),
?assertEqual (C, context(A)),
?assertEqual (<<"bar">>, context_value (A, <<"foo">>)),
?assertEqual (undefined, context_value (A, <<"bar">>))
end
}
].

-endif.
2 changes: 1 addition & 1 deletion src/mondemand_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ flush_config () ->
{ok, {Module, FlushStatePrepFunction, FlushFunction}} ->
{Module, FlushStatePrepFunction, FlushFunction};
undefined ->
{mondemand, undefined, flush_one_stat}
{mondemand, flush_state_init, flush_one_stat}
end.

%%--------------------------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions src/mondemand_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
name/1,
msg/1,
set_msg/2,
add_contexts/2,
add_context/3,
peek_name_from_udp/1,
peek_type_from_udp/1,
to_lwes/1,
Expand All @@ -37,6 +39,30 @@ name (#md_event { name = Name }) -> Name.
msg (#md_event { msg = Msg }) -> Msg.
set_msg (E = #md_event {}, Msg) -> E#md_event {msg = Msg}.

add_contexts (E = #md_event {msg = Msg}, L) when is_list (L) ->
NewMsg =
case Msg of
#md_stats_msg {} -> mondemand_statsmsg:add_contexts (Msg, L);
#md_log_msg {} -> mondemand_logmsg:add_contexts (Msg, L);
#md_perf_msg {} -> mondemand_perfmsg:add_contexts (Msg, L);
#md_annotation_msg {} ->
mondemand_annotationmsg:add_contexts (Msg, L);
#md_trace_msg {} -> Msg % trace doesn't have contexts
end,
E#md_event { msg = NewMsg }.

add_context (E = #md_event {msg = Msg}, CK, CV) ->
NewMsg =
case Msg of
#md_stats_msg {} -> mondemand_statsmsg:add_context (Msg, CK, CV);
#md_log_msg {} -> mondemand_logmsg:add_context (Msg, CK, CV);
#md_perf_msg {} -> mondemand_perfmsg:add_context (Msg, CK, CV);
#md_annotation_msg {} ->
mondemand_annotationmsg:add_context (Msg, CK, CV);
#md_trace_msg {} -> Msg % trace doesn't have contexts
end,
E#md_event { msg = NewMsg }.

peek_name_from_udp (Event = {udp, _, _, _, _}) ->
lwes_event:peek_name_from_udp (Event);
peek_name_from_udp (#md_event { name = Name }) ->
Expand Down
28 changes: 27 additions & 1 deletion src/mondemand_logmsg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

-export ([ new/4, % (ProgId, Host, Context, Lines)
new/5, % (ProgId, Host, Context, Lines, SendTime)
new_line/5 % (File, Line, Priority, Message, RepeatCount)
new_line/5, % (File, Line, Priority, Message, RepeatCount)
context/1,
context_value/2,
add_contexts/2,
add_context/3
]).

-export ([ to_lwes/1,
Expand All @@ -30,6 +34,28 @@ new_line (File, Line, Priority, Message, RepeatCount) ->
message = Message,
repeat_count = RepeatCount }.

context (#md_log_msg { context = Context }) -> Context.
context_value (#md_log_msg { context = Context }, ContextKey) ->
context_find (ContextKey, Context, undefined).

context_find (Key, Context, Default) ->
case lists:keyfind (Key, 1, Context) of
false -> Default;
{_, H} -> H
end.

add_contexts (LM = #md_log_msg { num_context = ContextNum,
context = Context},
L) when is_list (L) ->
LM#md_log_msg { num_context = ContextNum + length (L),
context = L ++ Context }.

add_context (LM = #md_log_msg { num_context = ContextNum,
context = Context},
ContextKey, ContextValue) ->
LM#md_log_msg { num_context = ContextNum + 1,
context = [ {ContextKey, ContextValue} | Context ] }.

to_lwes (L) when is_list (L) ->
lists:map (fun to_lwes/1, L);

Expand Down
14 changes: 14 additions & 0 deletions src/mondemand_perfmsg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
timing_end_time/1,
context/1,
context_value/2,
add_contexts/2,
add_context/3,
to_lwes/1,
from_lwes/1
]).
Expand Down Expand Up @@ -74,6 +76,18 @@ context_find (Key, Context, Default) ->
{_, H} -> H
end.

add_contexts (P = #md_perf_msg { num_context = ContextNum,
context = Context},
L) when is_list (L) ->
P#md_perf_msg { num_context = ContextNum + length (L),
context = L ++ Context }.

add_context (P = #md_perf_msg { num_context = ContextNum,
context = Context},
ContextKey, ContextValue) ->
P#md_perf_msg { num_context = ContextNum + 1,
context = [ {ContextKey, ContextValue} | Context ] }.

timings_to_lwes (NumTimings, Timings) ->
[ { ?LWES_U_INT_16, ?MD_NUM, NumTimings }
| lists:zipwith (fun timing_to_lwes/2,
Expand Down
48 changes: 30 additions & 18 deletions src/mondemand_statdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@

map_now/1,
map_then/2,
map/3,
map/4,

flush/2,
flush/3,
config/0,
all/0,
reset_stats/0,
Expand All @@ -113,6 +113,7 @@
max_sample_size,
statistics
}).
-record (map_state, {host, collect_time, stats_set_table, user_state}).

-define (STATS_TABLE, md_stats).
-define (CONFIG_TABLE, md_config).
Expand Down Expand Up @@ -479,14 +480,14 @@ config () ->
map_now (Function) ->
CurrentMinuteMillis = mondemand_util:current(),
StatsSetTable = minute_tab (mondemand_util:current_minute()),
map (Function, CurrentMinuteMillis, StatsSetTable).
map (Function, ok, CurrentMinuteMillis, StatsSetTable).

map_then (Function, Ago) ->
CurrentMinuteMillis = mondemand_util:current(),
PreviousMinuteMillis = CurrentMinuteMillis - 60000 * Ago,
PreviousMinute = minutes_ago (mondemand_util:current_minute(), Ago),
StatsSetTable = minute_tab (PreviousMinute),
map (Function, PreviousMinuteMillis, StatsSetTable).
map (Function, ok, PreviousMinuteMillis, StatsSetTable).

% I want to iterate over the config table, collapsing all metrics for a
% particular program id and context into a group so they can all be processed
Expand All @@ -495,7 +496,7 @@ map_then (Function, Ago) ->
% I want to use ets:first/1 and ets:next/2 so I can eventually set some rules
% about how they are processed in terms of time spent overall
%
map (Function, CollectTime, StatsSetTable) ->
map (Function, InitialState, CollectTime, StatsSetTable) ->
% there a couple of things we'd like to not recalculate but are probably
% used over and over, so get them here and pass them through
Host = mondemand_config:host (),
Expand All @@ -504,7 +505,12 @@ map (Function, CollectTime, StatsSetTable) ->
'$end_of_table' -> [];
FirstKey ->
% put the first into the current list to collapse
map1 (Function, {Host, CollectTime, StatsSetTable}, [FirstKey])
map1 (Function,
#map_state { host = Host,
collect_time = CollectTime,
stats_set_table = StatsSetTable,
user_state = InitialState },
[FirstKey])
end.

% need to skip the config as that's not what we want to map over
Expand All @@ -514,28 +520,32 @@ map1 (Function, State, [Key = '$default_config']) ->
NextKey ->
map1 (Function, State, [NextKey])
end;
map1 (Function, State,
map1 (Function, State = #map_state {user_state = UserState},
% match out the ProgId and Context from the current collapsed list
AllKeys = [LastKey = #mdkey {prog_id = ProgId, context = Context}|_]) ->

case ets:next (?CONFIG_TABLE,LastKey) of
'$end_of_table' ->
% we hit the end of the table, so just call the function with the
% current set of matched keys
Function (construct_stats_msg (AllKeys, State));
Function (construct_stats_msg (AllKeys, State), UserState);
Key = #mdkey {prog_id = ProgId, context = Context} ->
% this particular entry has the same ProgId and Context, so add it
% to the list of keys which are grouped together
map1 (Function, State, [Key|AllKeys]);
NonMatchingKey ->
% the key didn't match, so call the function with the current set
Function (construct_stats_msg (AllKeys, State)),
NewUserState =
Function (construct_stats_msg (AllKeys, State), UserState),
% then use this key for the next iteration
map1 (Function, State, [NonMatchingKey])
map1 (Function, State#map_state { user_state = NewUserState},
[NonMatchingKey])
end.

construct_stats_msg (AllKeys = [#mdkey {prog_id = ProgId, context = Context}|_],
{Host, CollectTime, Table}) ->
#map_state {host = Host,
collect_time = CollectTime,
stats_set_table = Table}) ->
Metrics = [ lookup_metric (I, Table) || I <- AllKeys ],
{FinalHost, FinalContext} =
mondemand_util:context_from_context (Host, Context),
Expand Down Expand Up @@ -613,7 +623,7 @@ all () ->
"--------------------"]),
map_now (fun (#md_stats_msg {prog_id = ProgId,
context = Context,
metrics = Metrics}) ->
metrics = Metrics}, State) ->
[
case T of
IT when IT =:= gauge; IT =:= counter ->
Expand All @@ -633,9 +643,9 @@ all () ->
]
end
|| #md_metric { type = T, key = K, value = V } <- Metrics
]
end),
ok.
],
State
end).

%-=====================================================================-
%- gen_server callbacks -
Expand Down Expand Up @@ -732,17 +742,19 @@ minutes_ago (MinuteNow, Ago) ->
N -> N
end.

flush (MinutesAgo, Function) ->
flush (MinutesAgo, Function, InitialState) ->
CurrentMinute = mondemand_util:current_minute(),
CurrentMinuteMillis = mondemand_util:current(),
PreviousMinuteMillis = CurrentMinuteMillis - 60000 * MinutesAgo,
PreviousMinute = minutes_ago (CurrentMinute, MinutesAgo),
StatsSetTable = minute_tab (PreviousMinute),
map (Function, PreviousMinuteMillis, StatsSetTable),
FinalState =
map (Function, InitialState, PreviousMinuteMillis, StatsSetTable),
MinuteToDelete =
minute_tab (minutes_ago (CurrentMinute,
mondemand_config:minutes_to_keep())),
ets:delete_all_objects (MinuteToDelete).
ets:delete_all_objects (MinuteToDelete),
FinalState.

ets_to_statset (Data, Stats) ->
% this needs to match the create side
Expand Down

0 comments on commit cbbcecb

Please sign in to comment.