Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use exometer to monitor stock tick stream? #104

Open
jacktang opened this issue Jan 3, 2018 · 5 comments
Open

How to use exometer to monitor stock tick stream? #104

jacktang opened this issue Jan 3, 2018 · 5 comments

Comments

@jacktang
Copy link

jacktang commented Jan 3, 2018

Hello,

I am going to use exometer to monitor stock tick stream, InfluxDB acts as the backend database. And the table schema is designed as below:

time    stock_name       exchange    acc_count
----------------------------------------------
t1         appl             x         1029
t2         amzn             y          20
t3         appl             x         1043
t4         amzn             y          80 

And I create each metric for every stock

exometer:update_or_create([tick, StockName, Exchange], 1, counter, []).

My question is how can I write exometer_report:subscribe/6 method to send data back to influxdb described above?

@uwiger
Copy link
Member

uwiger commented Jan 3, 2018

The way exometer works, you typically define a reporter which subscribes to metrics and at regular (configurable) intervals pushes data to some data store, log or analytics backend.

What this gives you is a set of periodic snapshots, which is often just what you want.

Is your question how you can ensure that all ticks are reported to the database?

@jacktang
Copy link
Author

jacktang commented Jan 4, 2018

@uwiger, yes. I want to collect all ticks count snapshot and report to database.

Update counter in client [updated]

exometer:update_or_create([tick, StockName, Exchange], 1, counter, []).

I have tried to wrap the snapshot like below (stock_count.erl)

init() ->
    ok = exometer:new([tick],
                  {function, ?MODULE, snapshot_count, [], proplist,
                   [stock_name, exchange, acc_count]}).

subscribe(Reporter, Interval) ->
    ok = exometer_report:subscribe(Reporter,
                                  [tick],
                                  [stock_name, exchange, acc_count],
                                   Interval, [], true).
snapshot_count() ->
    Metrics = exometer:find_entries([tick, '_', '_']), % [updated]
    lists:foldl(
        fun({Metric, _, _}, Acc) ->
            [mkt_tick, StockName, Exchange] = Metric,
            Count = 
                case exometer:get_value(Metric, value) of
                    {ok, V} ->
                        proplists:get_value(value, V, 0);
                    {error, not_found} ->
                        0
                end,
            Acc ++ [{Metric,  [{stock_name, StockName},
                               {exchange, Exchange},  % [updated]
                               {acc_count, Count}]}]
        end, [], Metrics).

The return value of snapshot_count is array of proplists and it fails to report to backend. Any suggestions?

@uwiger
Copy link
Member

uwiger commented Jan 4, 2018

Ok, just to check: your snapshot_count() function searches for metrics with the pattern [tick, '_', '_', '_'], but you previously said that your updates look like exometer:update_or_create([tick, StockName], 1, counter, []).

Also, your foldl() function doesn't actually produce a proplist of the form [{stock_name, V1},{exchange,V2},{acc_count,V3}]

Was this just a typo? What happens when you manually sample the [tick] metric with exometer:get_value([tick]).

My understanding of what you describe leads me to suggest this:

For demonstration, I create a reporter which simply stores values in an ets table. Relevant callbacks:

exometer_init(_Opts) ->
    ets:new(demo_ticks, [ordered_set, named_table, public]),
    {ok, []}.

exometer_report(Metric, value, _Extra, Value, St) ->
    Time = erlang:monotonic_time(millisecond),
    ets:insert(demo_ticks, {{Time,Metric}, Value}),
    {ok, St};
exometer_report(_Metric, _DataPoint, _Extra, _Value, St) ->
   {ok, St}.

exometer_report_bulk(Found, _Extra, St) ->
    Time = erlang:monotonic_time(millisecond),
    [ets:insert(demo_ticks, {{Time,Metric}, proplists:get_value(value, DPs)})
     || {Metric, DPs} <- Found],

Then, testing it in the shell:

Eshell V9.1  (abort with ^G)
1> application:ensure_all_started(exometer_core).
...
{ok,[hut,setup,bear,folsom,exometer_core]}
2> [exometer:update_or_create([tick,StockName,Exchange],1,counter,[]) || {StockName,Exchange} <- [{appl,x},{amzn,x},{appl,y},{amzn,y}]].
[ok,ok,ok,ok]
3> exometer_report:add_reporter(tick_reporter, [{module,tick_reporter},{intervals,[{main,5000}]},{report_bulk,true}]).
ok
4> exometer_report:subscribe(tick_reporter,{find,[tick,'_','_']},value,main).
ok
5> ets:tab2list(demo_ticks).
[{{-576460744789,[tick,amzn,x]},1},
 {{-576460744789,[tick,amzn,y]},1},
 {{-576460744789,[tick,appl,x]},1},
 {{-576460744789,[tick,appl,y]},1},
 {{-576460744788,[tick,amzn,x]},1},
 {{-576460744788,[tick,amzn,y]},1},
 {{-576460744788,[tick,appl,x]},1},
 {{-576460744788,[tick,appl,y]},1},
 {{-576460739787,[tick,amzn,x]},1},
 {{-576460739787,[tick,amzn,y]},1},
 {{-576460739787,[tick,appl,x]},1},
 {{-576460739787,[tick,appl,y]},1},
 {{-576460739786,[tick,amzn,x]},1},
 {{-576460739786,[tick,amzn,y]},1},
 {{-576460739786,[tick,appl,x]},1},
 {{-576460739786,[tick,appl,y]},1}]
6> [exometer:update_or_create([tick,StockName,Exchange],1,counter,[]) || {StockName,Exchange} <- [{appl,x},{amzn,x},{appl,y},{amzn,y}]].
[ok,ok,ok,ok]
7> ets:tab2list(demo_ticks).
 [...,
 {{-576460739786,[tick,appl,y]},1},
 {{-576460734785,[tick,amzn,x]},1}, 
 {{-576460734785,[tick,amzn,y]},1},
 {{-576460734785,[tick,appl,x]},1},
 {{-576460734785,[tick,appl,y]},1},
 {{-576460734783,[tick,amzn,x]},1},
 {{-576460734783,[tick,amzn,y]},1},
 {{-576460734783,[tick,appl,x]},1},
 {{-576460734783,[tick,appl|...]},1},
 {{-576460729782,[tick|...]},2},
 {{-576460729782,[...]},2},
 {{-576460729782,...},2},
 {{...},...},
 {...}|...]

@jacktang
Copy link
Author

jacktang commented Jan 4, 2018

Ok, just to check: your snapshot_count() function searches for metrics with the pattern [tick, '', '', '_'], but you previously said that your updates look like exometer:update_or_create([tick, StockName], 1, counter, []).
Also, your foldl() function doesn't actually produce a proplist of the form [{stock_name, V1},{exchange,V2},{acc_count,V3}]

@uwiger, thanks for point out the errors. I've update the code in the original post.
Creating new reporter is enlightening for me and I will to try to handle the data points in the reporter.

As mentioned before, I wanted to collect data from all stock metrics and send the data to one table in backend. And in above demonstration, it creates tables like tick_appl_x and tick_amzn_y according to the exometer_report_influxdb. Do you have any suggestion on merging the metrics into one table?

@uwiger
Copy link
Member

uwiger commented Jan 4, 2018

Well, my example simply wrote the metric and values to an ets table, using an increasing timestamp to separate consecutive updates. But the tick_reporter used the same callback (exometer_report_bulk) as the exometer_report_influxdb code you mentioned; the illustration was meant to show how you can set up a subscription on all tick counters and receive them periodically as a list of values in the reporter callback. Thus, you don't need a function metric that does the folding and aggregation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants