From 44bc637151a5f43e55e7f950ed304ddc10f573f2 Mon Sep 17 00:00:00 2001 From: pl0xz0rz Date: Fri, 14 Apr 2023 10:04:14 +0200 Subject: [PATCH 1/7] Ported categorical mutliselects to new pandas --- RootInteractive/InteractiveDrawing/bokeh/bokehTools.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/RootInteractive/InteractiveDrawing/bokeh/bokehTools.py b/RootInteractive/InteractiveDrawing/bokeh/bokehTools.py index bcd327f3..47feddf6 100644 --- a/RootInteractive/InteractiveDrawing/bokeh/bokehTools.py +++ b/RootInteractive/InteractiveDrawing/bokeh/bokehTools.py @@ -1388,7 +1388,7 @@ def makeBokehSelectWidget(df: pd.DataFrame, params: list, paramDict: dict, defau dfCategorical = df[params[0]].astype(pd.CategoricalDtype(ordered=True, categories=params[1:])) else: dfCategorical = df[params[0]] - codes, optionsPlot = pd.factorize(dfCategorical, sort=True, na_sentinel=None) + codes, optionsPlot = pd.factorize(dfCategorical, sort=True) optionsPlot = optionsPlot.dropna().to_list() optionsPlot = [str(i) for i in optionsPlot] default_value = 0 @@ -1432,7 +1432,7 @@ def makeBokehMultiSelectWidget(df: pd.DataFrame, params: list, paramDict: dict, dfCategorical = df[params[0]].astype(pd.CategoricalDtype(ordered=True, categories=params[1:])) else: dfCategorical = df[params[0]] - codes, optionsPlot = pd.factorize(dfCategorical, sort=True, na_sentinel=None) + codes, optionsPlot = pd.factorize(dfCategorical, sort=True) optionsPlot = optionsPlot.to_list() for i, val in enumerate(optionsPlot): optionsPlot[i] = str(val) @@ -1990,4 +1990,4 @@ def makeDescriptionTable(cdsDict, cdsName, fields, meta_fields): new_dict[i] = column columns.append(TableColumn(field=i, title=i)) cds.data = new_dict - return DataTable(source=cds, columns=columns) \ No newline at end of file + return DataTable(source=cds, columns=columns) From 6e6d22fc66bc649b6eae603101e333b27c5bbbe4 Mon Sep 17 00:00:00 2001 From: pl0xz0rz Date: Mon, 17 Apr 2023 11:06:55 +0200 Subject: [PATCH 2/7] Removed one-liner utility functions, replaced with numpy equivalents --- RootInteractive/MLpipeline/MIForestErrPDF.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index 972e2eb0..0c163b4b 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -40,15 +40,6 @@ def _accumulate_predictionNL(predict, X, out,col): def simple_predict(predict, X, out, col): out[col] = predict(X, check_input=False) -def partitionBlock(allRF, k, begin, end): - allRF[begin:end].partition(k) - -def blockMean(allRF, out, begin, end): - np.mean(allRF[begin:end], -1, out=out[begin:end]) - -def blockStd(allRF, out, begin, end): - np.std(allRF[begin:end], -1, out=out[begin:end]) - def predictRFStatChunk(rf, X, statDictionary,n_jobs): """ inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410 @@ -74,21 +65,21 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): block_end.append(X.shape[0]) if "median" in statDictionary: Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(partitionBlock)(allRFTranspose, nEstimators // 2, first, last) + delayed(allRF[first:last].partition)(nEstimators // 2) for first, last in zip(block_begin, block_end) ) statOut["median"]= allRFTranspose[:,nEstimators//2] if "mean" in statDictionary: mean_out = np.empty(X.shape[0]) Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(blockMean)(allRFTranspose, mean_out, first, last) + delayed(np.mean)(allRFTranspose[first:last], -1, out=mean_out[first:last]) for first, last in zip(block_begin, block_end) ) statOut["mean"]=mean_out if "std" in statDictionary: std_out = np.empty(X.shape[0]) Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(blockStd)(allRFTranspose, std_out, first, last) + delayed(blockStd)(allRFTranspose[first:last], -1, out=std_out[first:last]) for first, last in zip(block_begin, block_end) ) statOut["std"]=std_out @@ -96,7 +87,7 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): statOut["quantile"]={} quantiles = np.array(statDictionary["quantile"]) * nEstimators Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(partitionBlock)(allRFTranspose, quantiles, first, last) + delayed(allRF[first:last].partition)(quantiles) for first, last in zip(block_begin, block_end) ) for iQuant, quant in enumerate(statDictionary["quantile"]): From d10388c7b0ffc530cbe4a9ea6a976d3793bfaed1 Mon Sep 17 00:00:00 2001 From: pl0xz0rz Date: Mon, 17 Apr 2023 13:14:27 +0200 Subject: [PATCH 3/7] Fixed typo in np.std, added optimization for golen path --- RootInteractive/MLpipeline/MIForestErrPDF.py | 34 +++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index 0c163b4b..d0784d7a 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -37,9 +37,35 @@ def _accumulate_predictionNL(predict, X, out,col): prediction = predict(X, check_input=False) out[col] += prediction +def _predict_accumulate_square(predict, X, out, outSq, lock): + prediction = predict(X, check_input=False) + predictionSq = prediction*prediction + with lock: + out += prediction + outSq += predictionSq + def simple_predict(predict, X, out, col): out[col] = predict(X, check_input=False) +def predictRFStatTrivial(rf, X, statDictionary, n_jobs): + nEstimators = len(rf.estimators_) + rfSum = np.zeros(X.shape[0]) + rfSumSq = np.zeros(X.shape[0]) + statOut = {} + lock = threading.Lock() + Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( + delayed(_predict_accumulate_square)(e.predict, X, rfSum, rfSumSq, lock) for e in rf.estimators_ + ) + rfSum /= nEstimators + if "mean" in statDictionary: + statOut["mean"] = rfSum + if "std" in statDictionary: + # Somehow apply stream fusion? + rfSumSq /= nEstimators + rfSumSq -= rfSum*rfSum + statOut["std"] = np.sqrt(rfSumSq) + return statOut + def predictRFStatChunk(rf, X, statDictionary,n_jobs): """ inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410 @@ -79,7 +105,7 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): if "std" in statDictionary: std_out = np.empty(X.shape[0]) Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(blockStd)(allRFTranspose[first:last], -1, out=std_out[first:last]) + delayed(np.std)(allRFTranspose[first:last], -1, out=std_out[first:last]) for first, last in zip(block_begin, block_end) ) statOut["std"]=std_out @@ -110,6 +136,12 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): """ if(max_rows < 0): return predictRFStatChunk(rf, X, statDictionary, n_jobs) + is_trivial = True + for key in statDictionary.keys(): + if key not in ["mean", "std"]: + is_trivial = False + if is_trivial: + return predictRFStatTrivial(rf, X, statDictionary, n_jobs) block_begin = list(range(0, X.shape[0], max_rows)) block_end = block_begin[1:] block_end.append(X.shape[0]) From eedf20e5f302baf6cd36674d952af937c57f12d4 Mon Sep 17 00:00:00 2001 From: pl0xz0rz Date: Mon, 17 Apr 2023 15:36:51 +0200 Subject: [PATCH 4/7] Possibly optimized mean/std? --- RootInteractive/MLpipeline/MIForestErrPDF.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index d0784d7a..c9aadf74 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -66,7 +66,7 @@ def predictRFStatTrivial(rf, X, statDictionary, n_jobs): statOut["std"] = np.sqrt(rfSumSq) return statOut -def predictRFStatChunk(rf, X, statDictionary,n_jobs): +def predictRFStatChunk(rf, X, statDictionary, parallel): """ inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410 predict statistics from random forest @@ -79,7 +79,7 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): nEstimators = len(rf.estimators_) allRF = np.empty((nEstimators, X.shape[0])) statOut={} - Parallel(n_jobs=n_jobs, verbose=rf.verbose,require="sharedmem")( + parallel( delayed(simple_predict)(e.predict, X, allRF, col) for col,e in enumerate(rf.estimators_) ) @@ -90,21 +90,21 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): block_end = block_begin[1:] block_end.append(X.shape[0]) if "median" in statDictionary: - Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( + parallel( delayed(allRF[first:last].partition)(nEstimators // 2) for first, last in zip(block_begin, block_end) ) statOut["median"]= allRFTranspose[:,nEstimators//2] if "mean" in statDictionary: mean_out = np.empty(X.shape[0]) - Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( + parallel( delayed(np.mean)(allRFTranspose[first:last], -1, out=mean_out[first:last]) for first, last in zip(block_begin, block_end) ) statOut["mean"]=mean_out if "std" in statDictionary: std_out = np.empty(X.shape[0]) - Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( + parallel( delayed(np.std)(allRFTranspose[first:last], -1, out=std_out[first:last]) for first, last in zip(block_begin, block_end) ) @@ -112,7 +112,7 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs): if "quantile" in statDictionary: statOut["quantile"]={} quantiles = np.array(statDictionary["quantile"]) * nEstimators - Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( + parallel( delayed(allRF[first:last].partition)(quantiles) for first, last in zip(block_begin, block_end) ) @@ -135,7 +135,8 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): :return: dictionary with requested output statistics """ if(max_rows < 0): - return predictRFStatChunk(rf, X, statDictionary, n_jobs) + with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel: + return predictRFStatChunk(rf, X, statDictionary, parallel) is_trivial = True for key in statDictionary.keys(): if key not in ["mean", "std"]: @@ -146,8 +147,9 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): block_end = block_begin[1:] block_end.append(X.shape[0]) answers = [] - for (a,b) in zip(block_begin, block_end): - answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, n_jobs)) + with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel: + for (a,b) in zip(block_begin, block_end): + answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, parallel)) if not answers: return {} merged = {} From f00a55e23c3d6b341490136458fc64c2f58b3717 Mon Sep 17 00:00:00 2001 From: pl0xz0rz Date: Mon, 17 Apr 2023 15:47:59 +0200 Subject: [PATCH 5/7] bugfix --- RootInteractive/MLpipeline/MIForestErrPDF.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index c9aadf74..efd5fcab 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -66,7 +66,7 @@ def predictRFStatTrivial(rf, X, statDictionary, n_jobs): statOut["std"] = np.sqrt(rfSumSq) return statOut -def predictRFStatChunk(rf, X, statDictionary, parallel): +def predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs): """ inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410 predict statistics from random forest @@ -132,11 +132,12 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): :param X: input vector :param statDictionary: dictionary of statistics to predict :param n_jobs: number of parallel jobs for prediction + :param max_rows: :return: dictionary with requested output statistics """ if(max_rows < 0): with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel: - return predictRFStatChunk(rf, X, statDictionary, parallel) + return predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs) is_trivial = True for key in statDictionary.keys(): if key not in ["mean", "std"]: @@ -149,7 +150,7 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): answers = [] with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel: for (a,b) in zip(block_begin, block_end): - answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, parallel)) + answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, parallel, n_jobs)) if not answers: return {} merged = {} From 5cfd884e05f125408495120033f3e4f202a5a8a6 Mon Sep 17 00:00:00 2001 From: Marian Ivanov Date: Mon, 17 Apr 2023 22:10:42 +0200 Subject: [PATCH 6/7] Removed code taht didn't work --- RootInteractive/MLpipeline/MIForestErrPDF.py | 25 -------------------- 1 file changed, 25 deletions(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index efd5fcab..71f7da63 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -47,25 +47,6 @@ def _predict_accumulate_square(predict, X, out, outSq, lock): def simple_predict(predict, X, out, col): out[col] = predict(X, check_input=False) -def predictRFStatTrivial(rf, X, statDictionary, n_jobs): - nEstimators = len(rf.estimators_) - rfSum = np.zeros(X.shape[0]) - rfSumSq = np.zeros(X.shape[0]) - statOut = {} - lock = threading.Lock() - Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")( - delayed(_predict_accumulate_square)(e.predict, X, rfSum, rfSumSq, lock) for e in rf.estimators_ - ) - rfSum /= nEstimators - if "mean" in statDictionary: - statOut["mean"] = rfSum - if "std" in statDictionary: - # Somehow apply stream fusion? - rfSumSq /= nEstimators - rfSumSq -= rfSum*rfSum - statOut["std"] = np.sqrt(rfSumSq) - return statOut - def predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs): """ inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410 @@ -138,12 +119,6 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000): if(max_rows < 0): with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel: return predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs) - is_trivial = True - for key in statDictionary.keys(): - if key not in ["mean", "std"]: - is_trivial = False - if is_trivial: - return predictRFStatTrivial(rf, X, statDictionary, n_jobs) block_begin = list(range(0, X.shape[0], max_rows)) block_end = block_begin[1:] block_end.append(X.shape[0]) From 5c3bd803e511f13ef4e8a69cd6f4416c34a3b249 Mon Sep 17 00:00:00 2001 From: Marian Ivanov Date: Mon, 17 Apr 2023 22:11:35 +0200 Subject: [PATCH 7/7] Removed code taht didn't work --- RootInteractive/MLpipeline/MIForestErrPDF.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/RootInteractive/MLpipeline/MIForestErrPDF.py b/RootInteractive/MLpipeline/MIForestErrPDF.py index 71f7da63..868213bc 100644 --- a/RootInteractive/MLpipeline/MIForestErrPDF.py +++ b/RootInteractive/MLpipeline/MIForestErrPDF.py @@ -37,13 +37,6 @@ def _accumulate_predictionNL(predict, X, out,col): prediction = predict(X, check_input=False) out[col] += prediction -def _predict_accumulate_square(predict, X, out, outSq, lock): - prediction = predict(X, check_input=False) - predictionSq = prediction*prediction - with lock: - out += prediction - outSq += predictionSq - def simple_predict(predict, X, out, col): out[col] = predict(X, check_input=False)