Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDataFrame rolling quantile #343

Merged
merged 5 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions RootInteractive/InteractiveDrawing/bokeh/HistoNdProfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,30 @@ export class HistoNdProfile extends ColumnarDataSource {
entries_total += entries
if(entries > 0){
let mean = 0
let std = 0
let entries_ = 0
if(unbinned){
let old_mean = 0
for (let y = 0; y < stride_high; y+= stride_low) {
const index_low = x+y+z ? global_cumulative_histogram![x+y+z-1] : 0
const index_high = global_cumulative_histogram![x+y+z]
if(sorted_weights == null){
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
mean += sorted_entries![i]
if (isFinite(sorted_entries![i])){
entries_ ++
old_mean = mean
mean += (sorted_entries![i] - mean) / entries_
std += (sorted_entries![i] - mean) * (sorted_entries![i] - old_mean)
}
}
} else {
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
mean += sorted_entries![i] * sorted_weights[i]
if (isFinite(sorted_entries![i])){
entries_ += sorted_weights[i]
old_mean = mean
mean += (sorted_entries![i] - mean) * sorted_weights[i] / entries_
std += (sorted_entries![i] - mean) * (sorted_entries![i] - old_mean) * sorted_weights[i]
}
}
}
}
Expand All @@ -188,29 +199,8 @@ export class HistoNdProfile extends ColumnarDataSource {
for (let y = 0; y < stride_high; y+= stride_low) {
mean += bin_count[x+y+z] * bin_centers[x+y+z]
}
}
let std = 0
mean /= entries
if(unbinned){
for (let y = 0; y < stride_high; y+= stride_low) {
const index_low = x+y+z ? global_cumulative_histogram![x+y+z-1] : 0
const index_high = global_cumulative_histogram![x+y+z]
if(sorted_weights == null){
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
std += sorted_entries![i] * sorted_entries![i]
}
} else {
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
std += sorted_entries![i] * sorted_entries![i] * sorted_weights[i]
}
}
}
std -= mean*mean*entries
} else
{
for (let y = 0; y < stride_high; y+= stride_low) {
mean /= entries
for (let y = 0; y < stride_high; y+= stride_low) {
std += (bin_centers[x+y+z] - mean) * (bin_centers[x+y+z] - mean) * bin_count[x+y+z]
}
}
Expand Down
7 changes: 5 additions & 2 deletions RootInteractive/Tools/RDataFrame/RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def visit_Rolling(self, node: ast.Call):
"rollingSum": "rolling_sum",
"rollingMean": "rolling_mean",
"rollingStd": "rolling_std",
"rollingMedian": "rolling_median"
"rollingMedian": "rolling_median",
"rollingQuantile": "rolling_quantile"
}
time_arr_name=""
rolling_statistic_name = rollingStatistics[node.func.id]
Expand All @@ -271,6 +272,8 @@ def visit_Rolling(self, node: ast.Call):
if scalar_type(unpackScalarType(scalar_type_str(time_arr["type"]), 1))[0] == 'o':
raise TypeError("Weights array for rolling sum must be of numeric data type")
time_arr_name = f", {time_arr['implementation']}.begin()"
if rolling_statistic_name == "rolling_quantile":
init = f', {self.visit(keywords["quantile"])["value"]}'
new_helper_id = self.helpervar_idx
self.helpervar_idx += 1
self.helpervar_stmt.append((0, f"""
Expand Down Expand Up @@ -302,7 +305,7 @@ def visit_Call(self, node: ast.Call):
return {"type":('u',64), "implementation":f"({bsearch_names[node.func.id]}({searched_arr['implementation']}.begin(), {searched_arr['implementation']}.end(), {query['implementation']}, {cmp['implementation']})-{searched_arr['implementation']}.begin())"}
else:
raise TypeError(f"Expected 2 or 3 arguments, got {len(node.args)}")
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd", "rollingMedian"]:
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd", "rollingMedian", "rollingQuantile"]:
return self.visit_Rolling(node)
args = [self.visit(iArg) for iArg in node.args]
left = self.visit_func(node.func, args)
Expand Down
108 changes: 108 additions & 0 deletions RootInteractive/Tools/RDataFrame/RollingQuantile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,114 @@ OutputIt rolling_median_weighted(InputIt first, InputIt last, TimeIt t_first, Ou
return d_first;
}

// For small windows, insertion sort has less overhead than the heap or Las Vegas algorithms
template <class InputIt, class OutputIt>
OutputIt rolling_quantile(InputIt first, InputIt last, OutputIt d_first, size_t window, double quantile, bool center){
std::vector<InputIt> sorted;
size_t rolling_pos = 0;
unsigned long count = 0;
unsigned long idx_insert;
size_t n_skip = window >> 1;
// Sanity checks on quantile to prevent buffer overflow
if(quantile >= 1.){
quantile = 1.;
}
if(quantile <= 0. || quantile != quantile){
quantile = 0.;
}
InputIt window_end = first;
for(;window_end < last && window_end < first+window; ++window_end){
++count;
sorted.push_back(window_end);
for(auto insert_pos = sorted.end()-1; insert_pos > sorted.begin(); --insert_pos){
if(*(insert_pos-1) <= *insert_pos){
break;
}
InputIt tmp = *insert_pos;
*insert_pos = insert_pos[-1];
insert_pos[-1] = tmp;
}
if(count > n_skip || !center){
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
++rolling_pos;
}
while(window_end < last){
size_t found;
for(found=0; found<window; ++found){
if(sorted[found] == first){
sorted[found] = window_end;
break;
}
}
while(found+1<window && *sorted[found+1] < *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found+1];
sorted[found+1] = tmp;
++found;
}
while(found>0 && *sorted[found-1] > *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found-1];
sorted[found-1] = tmp;
--found;
}
++first;
++window_end;
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
if(center){
for(--count;count>n_skip;count--){
sorted.erase(std::remove(sorted.begin(), sorted.end(), first));
++first;
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
}
return d_first;

}

// TODO: If performance is bad, add a better algorithm than brute force
template <class InputIt, class TimeIt, class OutputIt, class DistT>
OutputIt rolling_quantile_weighted(InputIt first, InputIt last, TimeIt t_first, OutputIt d_first, DistT window, double quantile, bool center){
std::vector<InputIt> window_contents;
InputIt low = first;
InputIt high = first;
TimeIt t_low = t_first;
TimeIt t_high = t_first;
DistT off_low = center ? -window/2 : 0;
DistT off_high = center ? window/2 : window;
// Sanity checks on quantile to prevent buffer overflow
if(quantile >= 1.){
quantile = 1.;
}
if(quantile <= 0. || quantile != quantile){
quantile = 0.;
}
for(;first<last;++first){
while(high < last && *t_first + off_high > *t_high){
++high;
++t_high;
}
while(*t_first + off_low > *t_low){
++low;
++t_low;
}
window_contents.clear();
for(InputIt j = low; j<high; ++j){
window_contents.push_back(j);
}
std::nth_element(window_contents.begin(), window_contents.begin() + (size_t)((high - low)*quantile), window_contents.end(), [](InputIt a, InputIt b){return *a < *b;});
*d_first = **(window_contents.begin() + (size_t)((high - low)*quantile));
++d_first;
++t_first;
}
return d_first;
}

}

#endif
4 changes: 4 additions & 0 deletions RootInteractive/Tools/RDataFrame/test_RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ def test_define2(rdf):
print(rdf.Sum("arrayRollingMedian1D0").GetValue())
rdf = makeDefine("arrayRollingMedian1D1", "abs(rollingMedian(array1D0, 7, center=True)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D1").GetValue())
rdf = makeDefine("arrayRollingMedian1D2", "abs(rollingQuantile(array1D0, 7, center=True, quantile=.5)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D2").GetValue())
rdf = makeDefine("arrayRollingMedian1D3", "abs(rollingQuantile(array1D0, 7, center=True, quantile=.5, time=array1D0)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D3").GetValue())
return rdf

def test_exception(rdf):
Expand Down