From cbbcecb580ceff72d280c98ac053a478860b8a42 Mon Sep 17 00:00:00 2001 From: Anthony Molinaro Date: Tue, 21 Mar 2017 21:58:39 +0000 Subject: [PATCH] add contexts, and map to mapfoldl --- ChangeLog | 7 +++++ src/mondemand.app.src | 2 +- src/mondemand.erl | 30 ++++++++++++--------- src/mondemand_annotationmsg.erl | 41 ++++++++++++++++++++++++++++ src/mondemand_config.erl | 2 +- src/mondemand_event.erl | 26 ++++++++++++++++++ src/mondemand_logmsg.erl | 28 ++++++++++++++++++- src/mondemand_perfmsg.erl | 14 ++++++++++ src/mondemand_statdb.erl | 48 ++++++++++++++++++++------------- 9 files changed, 164 insertions(+), 34 deletions(-) diff --git a/ChangeLog b/ChangeLog index 06fa0ca..120c8ad 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +Version 6.8.0 (molinaro) + * add user state to the mondemand_statdb:map function so that you can do + the equivalent of mapfold there. + * alter flushing to use an initial state function + * added a convience function for adding contexts to events which support + them to mondemand_event. + Version 6.7.0 (molinaro) * generalize flush functionality so that the server can plug in a slightly modified version. diff --git a/src/mondemand.app.src b/src/mondemand.app.src index 69b5f0e..1a2a27c 100644 --- a/src/mondemand.app.src +++ b/src/mondemand.app.src @@ -1,7 +1,7 @@ { application, mondemand, [ { description, "Erlang Mondemand Bindings." }, - { vsn, "6.7.0" }, + { vsn, "6.8.0" }, { modules, [] }, { registered, [mondemand,mondemand_sup]}, { applications, [kernel,stdlib,syntax_tools,lwes,inets]}, diff --git a/src/mondemand.erl b/src/mondemand.erl index fcdf2cb..5e92484 100644 --- a/src/mondemand.erl +++ b/src/mondemand.erl @@ -66,6 +66,7 @@ % other functions send_stats/3, + flush_state_init/0, flush_one_stat/2, reset_stats/0, stats/0, @@ -313,17 +314,16 @@ flush ({FlushModule, FlushStatePrepFunction, FlushFunction}) -> send_stats (ProgId, Context, VmStats) end, % allow some initialization of state to be sent to each invocation - UserState = case FlushStatePrepFunction of - undefined -> undefined; - _ -> FlushModule:FlushStatePrepFunction() - end, - mondemand_statdb:flush (1, - fun (StatsMsg) -> - erlang:apply (FlushModule, - FlushFunction, - [UserState, StatsMsg]) - end - ). + Start = os:timestamp (), + Ret = mondemand_statdb:flush (1, + fun FlushModule:FlushFunction/2, + FlushModule:FlushStatePrepFunction() + ), + Finish = os:timestamp (), + error_logger:info_msg ("flushing took ~p millis and returned ~p", + [webmachine_util:now_diff_milliseconds (Finish,Start), + Ret]), + Ret. reset_stats () -> mondemand_statdb:reset_stats(). @@ -480,8 +480,12 @@ send_event (Events) when is_list (Events) -> send_event (Event = #lwes_event { name = Name }) -> gen_server:cast (?MODULE, {send, Name, lwes_event:to_binary (Event)}). -flush_one_stat (_, StatsMsg) -> - send_event (mondemand_statsmsg:to_lwes (StatsMsg)). +flush_state_init () -> + ok. + +flush_one_stat (StatsMsg, State) -> + send_event (mondemand_statsmsg:to_lwes (StatsMsg)), + State. open_all (Config) -> lists:foldl (fun ({T,C},{ok, D}) -> diff --git a/src/mondemand_annotationmsg.erl b/src/mondemand_annotationmsg.erl index 20b3aa8..e03124f 100644 --- a/src/mondemand_annotationmsg.erl +++ b/src/mondemand_annotationmsg.erl @@ -11,6 +11,8 @@ tags/1, context/1, context_value/2, + add_contexts/2, + add_context/3, to_lwes/1, from_lwes/1 ]). @@ -47,6 +49,18 @@ context_find (Key, Context, Default) -> {_, H} -> H end. +add_contexts (A = #md_annotation_msg { num_context = ContextNum, + context = Context}, + L) when is_list (L) -> + A#md_annotation_msg { num_context = ContextNum + length (L), + context = L ++ Context }. + +add_context (A = #md_annotation_msg { num_context = ContextNum, + context = Context}, + ContextKey, ContextValue) -> + A#md_annotation_msg { num_context = ContextNum + 1, + context = [ {ContextKey, ContextValue} | Context ] }. + tags_from_lwes (Data) -> Num = mondemand_util:find_in_dict (?MD_ANNOTATION_TAG_NUM, Data, 0), { Num, @@ -111,3 +125,30 @@ from_lwes (#lwes_event { attrs = Data}) -> % Precompute tag keys annotation_tag_key (N) -> ?ELEMENT_OF_TUPLE_LIST (N, ?MD_ANNOTATION_TAG). + +%-=====================================================================- +%- Test Functions - +%-=====================================================================- +-ifdef (TEST). +-include_lib ("eunit/include/eunit.hrl"). + +annotation_test_ () -> + [ + { "basic constructor test", + fun() -> + C = [{<<"foo">>,<<"bar">>}], + T = [<<"tag1">>,<<"tag2">>], + A = new (<<"baz">>, 123456, <<"description">>, <<"text">>, T, C), + ?assertEqual (<<"baz">>, id(A)), + ?assertEqual (123456, timestamp(A)), + ?assertEqual (<<"description">>, description(A)), + ?assertEqual (<<"text">>, text(A)), + ?assertEqual (T, tags(A)), + ?assertEqual (C, context(A)), + ?assertEqual (<<"bar">>, context_value (A, <<"foo">>)), + ?assertEqual (undefined, context_value (A, <<"bar">>)) + end + } + ]. + +-endif. diff --git a/src/mondemand_config.erl b/src/mondemand_config.erl index c71e2c9..5e64965 100644 --- a/src/mondemand_config.erl +++ b/src/mondemand_config.erl @@ -329,7 +329,7 @@ flush_config () -> {ok, {Module, FlushStatePrepFunction, FlushFunction}} -> {Module, FlushStatePrepFunction, FlushFunction}; undefined -> - {mondemand, undefined, flush_one_stat} + {mondemand, flush_state_init, flush_one_stat} end. %%-------------------------------------------------------------------- diff --git a/src/mondemand_event.erl b/src/mondemand_event.erl index 1e5530d..e266a3b 100644 --- a/src/mondemand_event.erl +++ b/src/mondemand_event.erl @@ -11,6 +11,8 @@ name/1, msg/1, set_msg/2, + add_contexts/2, + add_context/3, peek_name_from_udp/1, peek_type_from_udp/1, to_lwes/1, @@ -37,6 +39,30 @@ name (#md_event { name = Name }) -> Name. msg (#md_event { msg = Msg }) -> Msg. set_msg (E = #md_event {}, Msg) -> E#md_event {msg = Msg}. +add_contexts (E = #md_event {msg = Msg}, L) when is_list (L) -> + NewMsg = + case Msg of + #md_stats_msg {} -> mondemand_statsmsg:add_contexts (Msg, L); + #md_log_msg {} -> mondemand_logmsg:add_contexts (Msg, L); + #md_perf_msg {} -> mondemand_perfmsg:add_contexts (Msg, L); + #md_annotation_msg {} -> + mondemand_annotationmsg:add_contexts (Msg, L); + #md_trace_msg {} -> Msg % trace doesn't have contexts + end, + E#md_event { msg = NewMsg }. + +add_context (E = #md_event {msg = Msg}, CK, CV) -> + NewMsg = + case Msg of + #md_stats_msg {} -> mondemand_statsmsg:add_context (Msg, CK, CV); + #md_log_msg {} -> mondemand_logmsg:add_context (Msg, CK, CV); + #md_perf_msg {} -> mondemand_perfmsg:add_context (Msg, CK, CV); + #md_annotation_msg {} -> + mondemand_annotationmsg:add_context (Msg, CK, CV); + #md_trace_msg {} -> Msg % trace doesn't have contexts + end, + E#md_event { msg = NewMsg }. + peek_name_from_udp (Event = {udp, _, _, _, _}) -> lwes_event:peek_name_from_udp (Event); peek_name_from_udp (#md_event { name = Name }) -> diff --git a/src/mondemand_logmsg.erl b/src/mondemand_logmsg.erl index 1916b8d..0b2d0d5 100644 --- a/src/mondemand_logmsg.erl +++ b/src/mondemand_logmsg.erl @@ -5,7 +5,11 @@ -export ([ new/4, % (ProgId, Host, Context, Lines) new/5, % (ProgId, Host, Context, Lines, SendTime) - new_line/5 % (File, Line, Priority, Message, RepeatCount) + new_line/5, % (File, Line, Priority, Message, RepeatCount) + context/1, + context_value/2, + add_contexts/2, + add_context/3 ]). -export ([ to_lwes/1, @@ -30,6 +34,28 @@ new_line (File, Line, Priority, Message, RepeatCount) -> message = Message, repeat_count = RepeatCount }. +context (#md_log_msg { context = Context }) -> Context. +context_value (#md_log_msg { context = Context }, ContextKey) -> + context_find (ContextKey, Context, undefined). + +context_find (Key, Context, Default) -> + case lists:keyfind (Key, 1, Context) of + false -> Default; + {_, H} -> H + end. + +add_contexts (LM = #md_log_msg { num_context = ContextNum, + context = Context}, + L) when is_list (L) -> + LM#md_log_msg { num_context = ContextNum + length (L), + context = L ++ Context }. + +add_context (LM = #md_log_msg { num_context = ContextNum, + context = Context}, + ContextKey, ContextValue) -> + LM#md_log_msg { num_context = ContextNum + 1, + context = [ {ContextKey, ContextValue} | Context ] }. + to_lwes (L) when is_list (L) -> lists:map (fun to_lwes/1, L); diff --git a/src/mondemand_perfmsg.erl b/src/mondemand_perfmsg.erl index b9cc0dd..be8d727 100644 --- a/src/mondemand_perfmsg.erl +++ b/src/mondemand_perfmsg.erl @@ -17,6 +17,8 @@ timing_end_time/1, context/1, context_value/2, + add_contexts/2, + add_context/3, to_lwes/1, from_lwes/1 ]). @@ -74,6 +76,18 @@ context_find (Key, Context, Default) -> {_, H} -> H end. +add_contexts (P = #md_perf_msg { num_context = ContextNum, + context = Context}, + L) when is_list (L) -> + P#md_perf_msg { num_context = ContextNum + length (L), + context = L ++ Context }. + +add_context (P = #md_perf_msg { num_context = ContextNum, + context = Context}, + ContextKey, ContextValue) -> + P#md_perf_msg { num_context = ContextNum + 1, + context = [ {ContextKey, ContextValue} | Context ] }. + timings_to_lwes (NumTimings, Timings) -> [ { ?LWES_U_INT_16, ?MD_NUM, NumTimings } | lists:zipwith (fun timing_to_lwes/2, diff --git a/src/mondemand_statdb.erl b/src/mondemand_statdb.erl index 1630223..229efd5 100644 --- a/src/mondemand_statdb.erl +++ b/src/mondemand_statdb.erl @@ -87,9 +87,9 @@ map_now/1, map_then/2, - map/3, + map/4, - flush/2, + flush/3, config/0, all/0, reset_stats/0, @@ -113,6 +113,7 @@ max_sample_size, statistics }). +-record (map_state, {host, collect_time, stats_set_table, user_state}). -define (STATS_TABLE, md_stats). -define (CONFIG_TABLE, md_config). @@ -479,14 +480,14 @@ config () -> map_now (Function) -> CurrentMinuteMillis = mondemand_util:current(), StatsSetTable = minute_tab (mondemand_util:current_minute()), - map (Function, CurrentMinuteMillis, StatsSetTable). + map (Function, ok, CurrentMinuteMillis, StatsSetTable). map_then (Function, Ago) -> CurrentMinuteMillis = mondemand_util:current(), PreviousMinuteMillis = CurrentMinuteMillis - 60000 * Ago, PreviousMinute = minutes_ago (mondemand_util:current_minute(), Ago), StatsSetTable = minute_tab (PreviousMinute), - map (Function, PreviousMinuteMillis, StatsSetTable). + map (Function, ok, PreviousMinuteMillis, StatsSetTable). % I want to iterate over the config table, collapsing all metrics for a % particular program id and context into a group so they can all be processed @@ -495,7 +496,7 @@ map_then (Function, Ago) -> % I want to use ets:first/1 and ets:next/2 so I can eventually set some rules % about how they are processed in terms of time spent overall % -map (Function, CollectTime, StatsSetTable) -> +map (Function, InitialState, CollectTime, StatsSetTable) -> % there a couple of things we'd like to not recalculate but are probably % used over and over, so get them here and pass them through Host = mondemand_config:host (), @@ -504,7 +505,12 @@ map (Function, CollectTime, StatsSetTable) -> '$end_of_table' -> []; FirstKey -> % put the first into the current list to collapse - map1 (Function, {Host, CollectTime, StatsSetTable}, [FirstKey]) + map1 (Function, + #map_state { host = Host, + collect_time = CollectTime, + stats_set_table = StatsSetTable, + user_state = InitialState }, + [FirstKey]) end. % need to skip the config as that's not what we want to map over @@ -514,7 +520,7 @@ map1 (Function, State, [Key = '$default_config']) -> NextKey -> map1 (Function, State, [NextKey]) end; -map1 (Function, State, +map1 (Function, State = #map_state {user_state = UserState}, % match out the ProgId and Context from the current collapsed list AllKeys = [LastKey = #mdkey {prog_id = ProgId, context = Context}|_]) -> @@ -522,20 +528,24 @@ map1 (Function, State, '$end_of_table' -> % we hit the end of the table, so just call the function with the % current set of matched keys - Function (construct_stats_msg (AllKeys, State)); + Function (construct_stats_msg (AllKeys, State), UserState); Key = #mdkey {prog_id = ProgId, context = Context} -> % this particular entry has the same ProgId and Context, so add it % to the list of keys which are grouped together map1 (Function, State, [Key|AllKeys]); NonMatchingKey -> % the key didn't match, so call the function with the current set - Function (construct_stats_msg (AllKeys, State)), + NewUserState = + Function (construct_stats_msg (AllKeys, State), UserState), % then use this key for the next iteration - map1 (Function, State, [NonMatchingKey]) + map1 (Function, State#map_state { user_state = NewUserState}, + [NonMatchingKey]) end. construct_stats_msg (AllKeys = [#mdkey {prog_id = ProgId, context = Context}|_], - {Host, CollectTime, Table}) -> + #map_state {host = Host, + collect_time = CollectTime, + stats_set_table = Table}) -> Metrics = [ lookup_metric (I, Table) || I <- AllKeys ], {FinalHost, FinalContext} = mondemand_util:context_from_context (Host, Context), @@ -613,7 +623,7 @@ all () -> "--------------------"]), map_now (fun (#md_stats_msg {prog_id = ProgId, context = Context, - metrics = Metrics}) -> + metrics = Metrics}, State) -> [ case T of IT when IT =:= gauge; IT =:= counter -> @@ -633,9 +643,9 @@ all () -> ] end || #md_metric { type = T, key = K, value = V } <- Metrics - ] - end), - ok. + ], + State + end). %-=====================================================================- %- gen_server callbacks - @@ -732,17 +742,19 @@ minutes_ago (MinuteNow, Ago) -> N -> N end. -flush (MinutesAgo, Function) -> +flush (MinutesAgo, Function, InitialState) -> CurrentMinute = mondemand_util:current_minute(), CurrentMinuteMillis = mondemand_util:current(), PreviousMinuteMillis = CurrentMinuteMillis - 60000 * MinutesAgo, PreviousMinute = minutes_ago (CurrentMinute, MinutesAgo), StatsSetTable = minute_tab (PreviousMinute), - map (Function, PreviousMinuteMillis, StatsSetTable), + FinalState = + map (Function, InitialState, PreviousMinuteMillis, StatsSetTable), MinuteToDelete = minute_tab (minutes_ago (CurrentMinute, mondemand_config:minutes_to_keep())), - ets:delete_all_objects (MinuteToDelete). + ets:delete_all_objects (MinuteToDelete), + FinalState. ets_to_statset (Data, Stats) -> % this needs to match the create side