Skip to content

Commit

Permalink
Merge pull request #342 from pl0xz0rz/cdsstack
Browse files Browse the repository at this point in the history
Rolling median builtin for domain specific language with RDataFrame
  • Loading branch information
miranov25 authored Nov 4, 2023
2 parents 32e0f07 + 903bf42 commit b97af56
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 36 deletions.
30 changes: 14 additions & 16 deletions RootInteractive/Tools/RDataFrame/RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,20 @@ def visit(self, node):
def visit_Rolling(self, node: ast.Call):
if len(node.args) < 2:
raise TypeError(f"Got {len(node.args)} arguments, expected 2")
self.headers["RollingSum"] = "#include \"RollingSum.cpp\""
if node.func.id in ["rollingSum", "rollingMean", "rollingStd"]:
self.headers["RollingSum"] = "#include \"RollingSum.cpp\""
else:
self.headers["RollingQuantile"] = "#include \"RollingQuantile.cpp\""
arr = self.visit(node.args[0])
arr_name = arr["implementation"]
dtype = unpackScalarType(scalar_type_str(arr["type"]), 1)
width = self.visit(node.args[1])["implementation"]
if len(node.args) == 2:
init = "0"
else:
init = self.visit(node.args[2])["implementation"]
init = ""
if node.func.id != "rollingMedian":
if len(node.args) == 2:
init = ", 0"
else:
init = f', {self.visit(node.args[2])["implementation"]}'
# Args: array, kernel width, init (default value 0), optional pair of right add/left sub functions - required to be associative - TODO: Maybe specifying whether they are commutative is needed too
# as making them commutative allows vectorization - default sum, mean and std are obviously commutative
if len(node.args) > 3:
Expand All @@ -251,15 +256,15 @@ def visit_Rolling(self, node: ast.Call):
rollingStatistics = {
"rollingSum": "rolling_sum",
"rollingMean": "rolling_mean",
"rollingStd": "rolling_std"
"rollingStd": "rolling_std",
"rollingMedian": "rolling_median"
}
time_arr_name=""
rolling_statistic_name = rollingStatistics[node.func.id]
center = "false"
if "center" in keywords:
center = "true"
if "time" in keywords:
rolling_statistic_name = "rolling_sum"
new_arr_size = f"{arr_name}.size()"
qualifiers.append("_weighted")
time_arr=self.visit(keywords["time"])
Expand All @@ -270,15 +275,8 @@ def visit_Rolling(self, node: ast.Call):
self.helpervar_idx += 1
self.helpervar_stmt.append((0, f"""
ROOT::VecOps::RVec<{dtype}> arr_{new_helper_id}({new_arr_size});
RootInteractive::{rolling_statistic_name}{''.join(qualifiers)}({arr_name}.begin(), {arr_name}.end(){time_arr_name}, arr_{new_helper_id}.begin(), {width}, {init}, {center});
RootInteractive::{rolling_statistic_name}{''.join(qualifiers)}({arr_name}.begin(), {arr_name}.end(){time_arr_name}, arr_{new_helper_id}.begin(), {width}{init}, {center});
"""))
if node.func.id == "rollingMean":
if "time" in keywords:
self.helpervar_stmt.append((0, f"""
for(size_t i=0; i<{arr_name}.size(); ++i){{
arr_{new_helper_id}[i] /= 2*{width};
}}
"""))
return {
"implementation":f"arr_{new_helper_id}",
"type":('o',f"ROOT::VecOps::RVec<{dtype}>")
Expand All @@ -304,7 +302,7 @@ def visit_Call(self, node: ast.Call):
return {"type":('u',64), "implementation":f"({bsearch_names[node.func.id]}({searched_arr['implementation']}.begin(), {searched_arr['implementation']}.end(), {query['implementation']}, {cmp['implementation']})-{searched_arr['implementation']}.begin())"}
else:
raise TypeError(f"Expected 2 or 3 arguments, got {len(node.args)}")
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd"]:
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd", "rollingMedian"]:
return self.visit_Rolling(node)
args = [self.visit(iArg) for iArg in node.args]
left = self.visit_func(node.func, args)
Expand Down
107 changes: 107 additions & 0 deletions RootInteractive/Tools/RDataFrame/RollingQuantile.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#ifndef ROOTINTERACTIVE_ROLLING_QUANTILE
#define ROOTINTERACTIVE_ROLLING_QUANTILE

#include <algorithm>
#include <vector>

namespace RootInteractive{

using size_t = decltype(sizeof 1);

// For small windows, insertion sort has less overhead than the heap or Las Vegas algorithms
template <class InputIt, class OutputIt>
OutputIt rolling_median(InputIt first, InputIt last, OutputIt d_first, size_t window, bool center){
std::vector<InputIt> sorted;
size_t rolling_pos = 0;
unsigned long count = 0;
unsigned long idx_insert;
size_t n_skip = window >> 1;
InputIt window_end = first;
for(;window_end < last && window_end < first+window; ++window_end){
++count;
sorted.push_back(window_end);
for(auto insert_pos = sorted.end()-1; insert_pos > sorted.begin(); --insert_pos){
if(*(insert_pos-1) <= *insert_pos){
break;
}
InputIt tmp = *insert_pos;
*insert_pos = insert_pos[-1];
insert_pos[-1] = tmp;
}
if(count > n_skip || !center){
*d_first = *sorted[count/2];
++d_first;
}
++rolling_pos;
}
while(window_end < last){
size_t found;
for(found=0; found<window; ++found){
if(sorted[found] == first){
sorted[found] = window_end;
break;
}
}
while(found+1<window && *sorted[found+1] < *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found+1];
sorted[found+1] = tmp;
++found;
}
while(found>0 && *sorted[found-1] > *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found-1];
sorted[found-1] = tmp;
--found;
}
++first;
++window_end;
*d_first = *sorted[count/2];
++d_first;
}
if(center){
for(--count;count>n_skip;count--){
sorted.erase(std::remove(sorted.begin(), sorted.end(), first));
++first;
*d_first = *sorted[count/2];
++d_first;
}
}
return d_first;

}

// TODO: If performance is bad, add a better algorithm than brute force
template <class InputIt, class TimeIt, class OutputIt, class DistT>
OutputIt rolling_median_weighted(InputIt first, InputIt last, TimeIt t_first, OutputIt d_first, DistT window, bool center){
std::vector<InputIt> window_contents;
InputIt low = first;
InputIt high = first;
TimeIt t_low = t_first;
TimeIt t_high = t_first;
DistT off_low = center ? -window/2 : 0;
DistT off_high = center ? window/2 : window;
for(;first<last;++first){
while(high < last && *t_first + off_high > *t_high){
++high;
++t_high;
}
while(*t_first + off_low > *t_low){
++low;
++t_low;
}
window_contents.clear();
for(InputIt j = low; j<high; ++j){
window_contents.push_back(j);
}
std::nth_element(window_contents.begin(), window_contents.begin() + (high - low)/2, window_contents.end(), [](InputIt a, InputIt b){return *a < *b;});
*d_first = **(window_contents.begin() + (high - low)/2);
++d_first;
++t_first;
}
return d_first;
}

}

#endif
106 changes: 86 additions & 20 deletions RootInteractive/Tools/RDataFrame/RollingSum.cpp
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
#include <vector>

#ifndef ROOTINTERACTIVE_ROLLING_SUM
#define ROOTINTERACTIVE_ROLLING_SUM

using size_t = decltype(sizeof 1);
#include <vector>
#include <cmath>

namespace RootInteractive {

using size_t = decltype(sizeof 1);

template <class InputIt, class T>
std::vector<T> rolling_sum(InputIt first, InputIt last, size_t radius, T init){
std::vector<T> rolling_sum(InputIt first, InputIt last, size_t width, T init){
std::vector<T> result(last-first);
InputIt window_end = first;

return result;
}

//size_t arr_size = radius < last - first ? last-first+radius : 2*(last-first)-1;
//size_t arr_size = width < last - first ? last-first+radius : 2*(last-first)-1;
//std::vector<T> result(arr_size);
//
//TODO: Find out if it's automatically parallelized - should be, as it should find the "scan" operation
Expand Down Expand Up @@ -45,11 +46,11 @@ OutputIt rolling_sum(InputIt first, InputIt last, OutputIt d_first, size_t kerne
}

template <class InputIt, class OutputIt, class T>
OutputIt rolling_sum(InputIt first, InputIt last, OutputIt d_first, size_t radius, T init, bool center){
OutputIt rolling_sum(InputIt first, InputIt last, OutputIt d_first, size_t window, T init, bool center){
InputIt window_end = first;
unsigned long count = 0;
unsigned long n_skip = radius >> 1;
for(;window_end < last && window_end < first + radius; ++window_end){
unsigned long n_skip = window >> 1;
for(;window_end < last && window_end < first + window; ++window_end){
init = std::move(init) + *window_end;
if(count > n_skip || !center){
*d_first = init;
Expand Down Expand Up @@ -78,11 +79,11 @@ OutputIt rolling_sum(InputIt first, InputIt last, OutputIt d_first, size_t radiu
}

template <class InputIt, class OutputIt, class T>
OutputIt rolling_mean(InputIt first, InputIt last, OutputIt d_first, size_t radius, T init, bool center){
OutputIt rolling_mean(InputIt first, InputIt last, OutputIt d_first, size_t window, T init, bool center){
InputIt window_end = first;
unsigned long count = 0;
unsigned long n_skip = radius >> 1;
for(;window_end < last && window_end < first + radius; ++window_end){
unsigned long n_skip = window >> 1;
for(;window_end < last && window_end < first + window; ++window_end){
++count;
init = std::move(init) + *window_end;
if(count > n_skip || !center){
Expand Down Expand Up @@ -111,19 +112,19 @@ OutputIt rolling_mean(InputIt first, InputIt last, OutputIt d_first, size_t radi
}

template <class InputIt, class OutputIt, class T>
OutputIt rolling_std(InputIt first, InputIt last, OutputIt d_first, size_t radius, T init, bool center){
OutputIt rolling_std(InputIt first, InputIt last, OutputIt d_first, size_t window, T init, bool center){
InputIt window_end = first;
unsigned long count = 0;
unsigned long n_skip = radius >> 1;
unsigned long n_skip = window >> 1;
T sum_sq = init;
T cur;
for(;window_end < last && window_end < first + radius; ++window_end){
for(;window_end < last && window_end < first + window; ++window_end){
++count;
cur = *window_end;
init = std::move(init) + cur;
sum_sq = std::move(sum_sq) + cur*cur;
if(count > n_skip || !center){
*d_first = (sum_sq-init*init/count)/count;
*d_first = std::sqrt((sum_sq-init*init/count)/count);
++d_first;
}
}
Expand All @@ -136,7 +137,7 @@ OutputIt rolling_std(InputIt first, InputIt last, OutputIt d_first, size_t radiu
sum_sq = std::move(sum_sq) - cur*cur;
++first;
++window_end;
*d_first = (sum_sq-init*init/count)/count;
*d_first = std::sqrt((sum_sq-init*init/count)/count);
++d_first;
}
if(center){
Expand All @@ -146,7 +147,7 @@ OutputIt rolling_std(InputIt first, InputIt last, OutputIt d_first, size_t radiu
init = std::move(init) - cur;
sum_sq = std::move(sum_sq) - cur*cur;
++first;
*d_first = (sum_sq-init*init/count)/count;
*d_first = std::sqrt((sum_sq-init*init/count)/count);
++d_first;
}
}
Expand All @@ -171,15 +172,16 @@ OutputIt rolling_sum_symmetric(InputIt first, InputIt last, OutputIt d_first, si
}

template <class InputIt, class WeightsIt, class OutputIt, class T, class DistT>
OutputIt rolling_sum_weighted(InputIt first, InputIt last, WeightsIt w_first, OutputIt d_first, DistT radius, T init, bool center){
OutputIt rolling_sum_weighted(InputIt first, InputIt last, WeightsIt w_first, OutputIt d_first, DistT width, T init, bool center){
// Uses rolling window and nearest neighbor interpolation
InputIt low = first;
InputIt high = first;
WeightsIt w_low = w_first;
WeightsIt w_high = w_first;
DistT off_low = center ? -radius : 0;
DistT off_low = center ? -width/2 : 0;
DistT off_high = center ? width/2 : width;
for(;first<last;++first){
while(high < last && *w_first + radius > *w_high){
while(high < last && *w_first + off_high > *w_high){
init = std::move(init) + *high;
++high;
++w_high;
Expand All @@ -196,6 +198,70 @@ OutputIt rolling_sum_weighted(InputIt first, InputIt last, WeightsIt w_first, Ou
return d_first;
}

template <class InputIt, class WeightsIt, class OutputIt, class T, class DistT>
OutputIt rolling_mean_weighted(InputIt first, InputIt last, WeightsIt w_first, OutputIt d_first, DistT width, T init, bool center){
// Uses rolling window and nearest neighbor interpolation
InputIt low = first;
InputIt high = first;
WeightsIt w_low = w_first;
WeightsIt w_high = w_first;
DistT off_low = center ? -width/2 : 0;
DistT off_high = center ? width/2 : width;
long count = 0;
for(;first<last;++first){
while(high < last && *w_first + off_high > *w_high){
++count;
init = std::move(init) + *high;
++high;
++w_high;
}
while(*w_first + off_low > *w_low){
--count;
init = std::move(init) - *low;
++low;
++w_low;
}
*d_first = init / count;
++d_first;
++w_first;
}
return d_first;
}

template <class InputIt, class WeightsIt, class OutputIt, class T, class DistT>
OutputIt rolling_std_weighted(InputIt first, InputIt last, WeightsIt w_first, OutputIt d_first, DistT width, T init, bool center){
// Uses rolling window and nearest neighbor interpolation
InputIt low = first;
InputIt high = first;
WeightsIt w_low = w_first;
WeightsIt w_high = w_first;
DistT off_low = center ? -width/2 : 0;
DistT off_high = center ? width/2 : width;
long count = 0;
T sum_sq = init;
T cur;
for(;first<last;++first){
while(high < last && *w_first + off_high > *w_high){
cur = *high;
init = std::move(init) + cur;
sum_sq += cur*cur;
++high;
++w_high;
}
while(*w_first + off_low > *w_low){
cur = *low;
init = std::move(init) - *low;
sum_sq -= cur*cur;
++low;
++w_low;
}
*d_first = std::sqrt((sum_sq-init*init/count)/count);
++d_first;
++w_first;
}
return d_first;
}

}

#endif
4 changes: 4 additions & 0 deletions RootInteractive/Tools/RDataFrame/test_RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ def test_define2(rdf):
print(rdf.Sum("arrayRollingMean1D2").GetValue())
rdf = makeDefine("arrayRollingStd1D0", "abs(rollingStd(array1D0, 5)[2:-2]-2)", rdf, None, 3)
print(rdf.Sum("arrayRollingStd1D0").GetValue())
rdf = makeDefine("arrayRollingMedian1D0", "abs(rollingMedian(array1D0, 7, center=True, time=array1D0)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D0").GetValue())
rdf = makeDefine("arrayRollingMedian1D1", "abs(rollingMedian(array1D0, 7, center=True)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D1").GetValue())
return rdf

def test_exception(rdf):
Expand Down

0 comments on commit b97af56

Please sign in to comment.