diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a37fdfc..64be65d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,6 +34,9 @@ jobs: - name: Run Proper Tests run: rebar3 proper -c + - name: Run EUnit Tests + run: rebar3 eunit -c + - name: Coverage run: rebar3 cover --verbose --min_coverage 80 # zip/3 can only be fully tested on OTP-26+ diff --git a/README.md b/README.md index 058dd3f..3da0c8c 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,29 @@ OTP `lists` module. Functions `iterator_pmap:pmap/2` and `iterator_pmap:pmap/3` provide parallel version of `iterator:map/2`: it takes iterator as input and returns a new iterator where map function is executed for each input element in parallel on a pool of worker processes. -While elements of input are processed in parallel, the ordering of elements is preserved. +The `ordered` parameter controls if the parallel map should preserve the order of the original +iterator or it is allowed to reshuffle the elements (so it outputs elements which are processed +faster - earlier, increasing the throughput). + +Another non-standard function is `pv/3` (from `man pv` - "pipe view"). A pass-through iterator +that can be added somewhere in the pipeline to periodically (either every `for_each_n` elements +or every `every_s` seconds) report the current progress of a long-running iterator: + +```erlang +I0 = ..., +I1 = iterator:pv( + fun(SampleElement, TimePassed, ItemsPassed, TotalItems) -> + TimeS = erlang:convert_time_unit(TimePassed, native, second), + ?LOG_INFO("Processed ~p items. Pace is ~p per-second. Current item: ~p", + [TotalItems, ItemsPassed / TimeS, SampleElement]) + end, + #{for_each_n => 1000, + every_s => 30}, + I0), +... +``` +This example will log current progress either every 30 seconds or after processing every 1000 +elements (whichever triggers first). ## Setup diff --git a/src/iterator.erl b/src/iterator.erl index 335c022..0f6b119 100644 --- a/src/iterator.erl +++ b/src/iterator.erl @@ -30,6 +30,7 @@ map/2, mapfoldl/3, nthtail/2, + pv/3, sublist/2, takewhile/2, zip/3 @@ -485,6 +486,97 @@ maybe_next(done) -> maybe_next(#iter{} = Iter) -> next(Iter). +-record(pv, { + f :: fun((any(), integer(), integer(), integer()) -> any()), + for_each_n :: pos_integer(), + every_s :: pos_integer(), + last_report_time :: integer(), + last_report_n :: non_neg_integer(), + total_n :: non_neg_integer(), + inner_i :: iterator:iterator(any()) +}). + +%% @doc Passthrough iterator one can use to periodically report the progress of the inner iterator. +%% Name comes from `pv' (pipe view) Unix utility. See `man pv'. +%% @param F function to call when one of the conditions triggers. Function arguments: +%% - `Data' - current element of the inner iterator (sample) +%% - `TimePassed' - time passed since the last report in native units +%% - `ItemsPassed' - number of items passed since the last report +%% - `TotalItems' - total number of items passed since the start +%% `TimePassed' + `ItemsPassed' are convenient to calculate the speed of the stream. +%% @param Opts trigger condition options: +%% - `for_each_n' - trigger every N-th element +%% - `every_s' - trigger every S seconds +%% @param InnerIter inner iterator to wrap +%% +%% Keep in mind that whichever trigger condition is met first, the `F' function will be called and +%% counters/timers will reset. So if you set `for_each_n' to 1000 and `every_s' to 30, then the +%% function will be called either as counter reaches 1000 or 30 seconds pass since the last call. +%% +%% If it takes more than `every_s' seconds to process a single element, the function will be called +%% with additional delay. +-spec pv( + fun( + (Type, TimePassed :: integer(), ItemsPassed :: integer(), TotalItems :: integer()) -> any() + ), + #{ + for_each_n => pos_integer(), + every_s => pos_integer() + }, + iterator:iterator(Type) +) -> iterator:iterator(Type) when + Type :: any(). +pv(F, Opts, InnerIter) when is_function(F, 4) -> + Start = erlang:monotonic_time(), + State = #pv{ + f = F, + every_s = maps:get(every_s, Opts, 30), + for_each_n = maps:get(for_each_n, Opts, 1000), + last_report_time = Start, + last_report_n = 0, + total_n = 0, + inner_i = InnerIter + }, + iterator:new(fun yield_pv/1, State). + +yield_pv( + #pv{ + f = F, + every_s = TimeTrigger, + for_each_n = CountTrigger, + last_report_time = LastReportT, + last_report_n = LastReportN, + total_n = N, + inner_i = InnerIter + } = St +) -> + case iterator:next(InnerIter) of + {ok, Data, NewInnerIter} -> + NextN = N + 1, + ItemsProcessed = NextN - LastReportN, + CountCondition = ItemsProcessed >= CountTrigger, + Now = erlang:monotonic_time(), + TimePassed = Now - LastReportT, + TimeCondition = erlang:convert_time_unit(TimePassed, native, second) >= TimeTrigger, + if + CountCondition orelse TimeCondition -> + F(Data, TimePassed, ItemsProcessed, NextN), + {Data, St#pv{ + last_report_time = Now, + last_report_n = NextN, + total_n = NextN, + inner_i = NewInnerIter + }}; + true -> + {Data, St#pv{ + total_n = NextN, + inner_i = NewInnerIter + }} + end; + done -> + done + end. + %% @doc Iterator over .eterm file (file containing dot-terminated Erlang terms) %% XXX: never abandon this iterator from long-running processes! It would leak file descriptor! %% Either consume it to the end or close with `iterator:close/1' explicitly. diff --git a/test/iterator_tests.erl b/test/iterator_tests.erl new file mode 100644 index 0000000..5e4ff8d --- /dev/null +++ b/test/iterator_tests.erl @@ -0,0 +1,55 @@ +%% @doc Unit-tests for `iterator'. + +-module(iterator_tests). + +-include_lib("eunit/include/eunit.hrl"). + +pv_each_n_test() -> + ForEachN = 5, + I0 = iterator:from_list(lists:seq(1, 50)), + Counter = counters:new(1, []), + I1 = iterator:pv( + fun(_, _Time, NItems, TotalItems) -> + ok = counters:add(Counter, 1, ForEachN), + ?assertEqual(TotalItems, counters:get(Counter, 1)), + ?assertEqual(ForEachN, NItems) + end, + #{ + for_each_n => ForEachN, + % large so it never triggers + every_s => 120 + }, + I0 + ), + iterator:to_list(I1). + +%% XXX: ths test can be flaky because it relies on the sleep time +pv_every_s_test() -> + EveryS = 1, + Size = 70, + Sleep = 50, + ApproxPerBatch = (EveryS * 1000) div Sleep, + I0 = iterator:from_list(lists:seq(1, Size)), + I1 = iterator:map( + fun(X) -> + timer:sleep(Sleep), + X + end, + I0 + ), + I2 = iterator:pv( + fun(_, Time, NItems, _TotalItems) -> + TimeMs = erlang:convert_time_unit(Time, native, millisecond), + %% We can't assert exact values here because of the sleep + ?assert(abs(TimeMs - (EveryS * 1000)) < 30, [{time, TimeMs}]), + ?assert(abs(NItems - ApproxPerBatch) < 4, [{n_items, NItems}]), + io:format("ok!~n", []) + end, + #{ + % large so it never triggers + for_each_n => 100, + every_s => EveryS + }, + I1 + ), + iterator:to_list(I2). diff --git a/test/prop_pmap.erl b/test/prop_pmap.erl index 26e9ed5..c3186e8 100644 --- a/test/prop_pmap.erl +++ b/test/prop_pmap.erl @@ -245,14 +245,26 @@ links() -> lists:sort(L). assert_links(Links0) -> + %% Wait up to 10 * 200 = 2s + assert_links(Links0, 10, 200). + +assert_links(Links0, N, Sleep) when N > 0 -> Links = links(), - ?assertEqual( - Links0, - Links, - [ - {extra, [ - {P, erlang:process_info(P)} - || P <- ordsets:subtract(Links, Links0) - ]} - ] - ). + if + length(Links0) =:= length(Links) -> + ?assertEqual( + Links0, + Links, + [ + {extra, [ + {P, erlang:process_info(P)} + || P <- ordsets:subtract(Links, Links0) + ]} + ] + ); + true -> + timer:sleep(Sleep), + assert_links(Links0, N - 1, Sleep) + end; +assert_links(Links, _, _) -> + ?assertEqual(Links, links()).