Skip to content

Commit

Permalink
Merge pull request #343 from pl0xz0rz/cdsstack
Browse files Browse the repository at this point in the history
RDataFrame rolling quantile
  • Loading branch information
miranov25 authored Nov 8, 2023
2 parents b97af56 + 7ca323f commit 84ed466
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 29 deletions.
44 changes: 17 additions & 27 deletions RootInteractive/InteractiveDrawing/bokeh/HistoNdProfile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,30 @@ export class HistoNdProfile extends ColumnarDataSource {
entries_total += entries
if(entries > 0){
let mean = 0
let std = 0
let entries_ = 0
if(unbinned){
let old_mean = 0
for (let y = 0; y < stride_high; y+= stride_low) {
const index_low = x+y+z ? global_cumulative_histogram![x+y+z-1] : 0
const index_high = global_cumulative_histogram![x+y+z]
if(sorted_weights == null){
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
mean += sorted_entries![i]
if (isFinite(sorted_entries![i])){
entries_ ++
old_mean = mean
mean += (sorted_entries![i] - mean) / entries_
std += (sorted_entries![i] - mean) * (sorted_entries![i] - old_mean)
}
}
} else {
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
mean += sorted_entries![i] * sorted_weights[i]
if (isFinite(sorted_entries![i])){
entries_ += sorted_weights[i]
old_mean = mean
mean += (sorted_entries![i] - mean) * sorted_weights[i] / entries_
std += (sorted_entries![i] - mean) * (sorted_entries![i] - old_mean) * sorted_weights[i]
}
}
}
}
Expand All @@ -188,29 +199,8 @@ export class HistoNdProfile extends ColumnarDataSource {
for (let y = 0; y < stride_high; y+= stride_low) {
mean += bin_count[x+y+z] * bin_centers[x+y+z]
}
}
let std = 0
mean /= entries
if(unbinned){
for (let y = 0; y < stride_high; y+= stride_low) {
const index_low = x+y+z ? global_cumulative_histogram![x+y+z-1] : 0
const index_high = global_cumulative_histogram![x+y+z]
if(sorted_weights == null){
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
std += sorted_entries![i] * sorted_entries![i]
}
} else {
for(let i=index_low; i < index_high; ++i){
if (isFinite(sorted_entries![i]))
std += sorted_entries![i] * sorted_entries![i] * sorted_weights[i]
}
}
}
std -= mean*mean*entries
} else
{
for (let y = 0; y < stride_high; y+= stride_low) {
mean /= entries
for (let y = 0; y < stride_high; y+= stride_low) {
std += (bin_centers[x+y+z] - mean) * (bin_centers[x+y+z] - mean) * bin_count[x+y+z]
}
}
Expand Down
7 changes: 5 additions & 2 deletions RootInteractive/Tools/RDataFrame/RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def visit_Rolling(self, node: ast.Call):
"rollingSum": "rolling_sum",
"rollingMean": "rolling_mean",
"rollingStd": "rolling_std",
"rollingMedian": "rolling_median"
"rollingMedian": "rolling_median",
"rollingQuantile": "rolling_quantile"
}
time_arr_name=""
rolling_statistic_name = rollingStatistics[node.func.id]
Expand All @@ -271,6 +272,8 @@ def visit_Rolling(self, node: ast.Call):
if scalar_type(unpackScalarType(scalar_type_str(time_arr["type"]), 1))[0] == 'o':
raise TypeError("Weights array for rolling sum must be of numeric data type")
time_arr_name = f", {time_arr['implementation']}.begin()"
if rolling_statistic_name == "rolling_quantile":
init = f', {self.visit(keywords["quantile"])["value"]}'
new_helper_id = self.helpervar_idx
self.helpervar_idx += 1
self.helpervar_stmt.append((0, f"""
Expand Down Expand Up @@ -302,7 +305,7 @@ def visit_Call(self, node: ast.Call):
return {"type":('u',64), "implementation":f"({bsearch_names[node.func.id]}({searched_arr['implementation']}.begin(), {searched_arr['implementation']}.end(), {query['implementation']}, {cmp['implementation']})-{searched_arr['implementation']}.begin())"}
else:
raise TypeError(f"Expected 2 or 3 arguments, got {len(node.args)}")
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd", "rollingMedian"]:
elif isinstance(node.func, ast.Name) and node.func.id in ["rollingSum", "rollingMean", "rollingStd", "rollingMedian", "rollingQuantile"]:
return self.visit_Rolling(node)
args = [self.visit(iArg) for iArg in node.args]
left = self.visit_func(node.func, args)
Expand Down
108 changes: 108 additions & 0 deletions RootInteractive/Tools/RDataFrame/RollingQuantile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,114 @@ OutputIt rolling_median_weighted(InputIt first, InputIt last, TimeIt t_first, Ou
return d_first;
}

// For small windows, insertion sort has less overhead than the heap or Las Vegas algorithms
template <class InputIt, class OutputIt>
OutputIt rolling_quantile(InputIt first, InputIt last, OutputIt d_first, size_t window, double quantile, bool center){
std::vector<InputIt> sorted;
size_t rolling_pos = 0;
unsigned long count = 0;
unsigned long idx_insert;
size_t n_skip = window >> 1;
// Sanity checks on quantile to prevent buffer overflow
if(quantile >= 1.){
quantile = 1.;
}
if(quantile <= 0. || quantile != quantile){
quantile = 0.;
}
InputIt window_end = first;
for(;window_end < last && window_end < first+window; ++window_end){
++count;
sorted.push_back(window_end);
for(auto insert_pos = sorted.end()-1; insert_pos > sorted.begin(); --insert_pos){
if(*(insert_pos-1) <= *insert_pos){
break;
}
InputIt tmp = *insert_pos;
*insert_pos = insert_pos[-1];
insert_pos[-1] = tmp;
}
if(count > n_skip || !center){
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
++rolling_pos;
}
while(window_end < last){
size_t found;
for(found=0; found<window; ++found){
if(sorted[found] == first){
sorted[found] = window_end;
break;
}
}
while(found+1<window && *sorted[found+1] < *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found+1];
sorted[found+1] = tmp;
++found;
}
while(found>0 && *sorted[found-1] > *sorted[found]){
InputIt tmp = sorted[found];
sorted[found] = sorted[found-1];
sorted[found-1] = tmp;
--found;
}
++first;
++window_end;
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
if(center){
for(--count;count>n_skip;count--){
sorted.erase(std::remove(sorted.begin(), sorted.end(), first));
++first;
*d_first = *sorted[(size_t)(count*quantile)];
++d_first;
}
}
return d_first;

}

// TODO: If performance is bad, add a better algorithm than brute force
template <class InputIt, class TimeIt, class OutputIt, class DistT>
OutputIt rolling_quantile_weighted(InputIt first, InputIt last, TimeIt t_first, OutputIt d_first, DistT window, double quantile, bool center){
std::vector<InputIt> window_contents;
InputIt low = first;
InputIt high = first;
TimeIt t_low = t_first;
TimeIt t_high = t_first;
DistT off_low = center ? -window/2 : 0;
DistT off_high = center ? window/2 : window;
// Sanity checks on quantile to prevent buffer overflow
if(quantile >= 1.){
quantile = 1.;
}
if(quantile <= 0. || quantile != quantile){
quantile = 0.;
}
for(;first<last;++first){
while(high < last && *t_first + off_high > *t_high){
++high;
++t_high;
}
while(*t_first + off_low > *t_low){
++low;
++t_low;
}
window_contents.clear();
for(InputIt j = low; j<high; ++j){
window_contents.push_back(j);
}
std::nth_element(window_contents.begin(), window_contents.begin() + (size_t)((high - low)*quantile), window_contents.end(), [](InputIt a, InputIt b){return *a < *b;});
*d_first = **(window_contents.begin() + (size_t)((high - low)*quantile));
++d_first;
++t_first;
}
return d_first;
}

}

#endif
4 changes: 4 additions & 0 deletions RootInteractive/Tools/RDataFrame/test_RDataFrame_Array.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ def test_define2(rdf):
print(rdf.Sum("arrayRollingMedian1D0").GetValue())
rdf = makeDefine("arrayRollingMedian1D1", "abs(rollingMedian(array1D0, 7, center=True)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D1").GetValue())
rdf = makeDefine("arrayRollingMedian1D2", "abs(rollingQuantile(array1D0, 7, center=True, quantile=.5)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D2").GetValue())
rdf = makeDefine("arrayRollingMedian1D3", "abs(rollingQuantile(array1D0, 7, center=True, quantile=.5, time=array1D0)[3:-3] - array1D0[3:-3])", rdf, None, 3)
print(rdf.Sum("arrayRollingMedian1D3").GetValue())
return rdf

def test_exception(rdf):
Expand Down

0 comments on commit 84ed466

Please sign in to comment.