Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predict rf stat #323

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions RootInteractive/InteractiveDrawing/bokeh/bokehTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ def makeBokehSelectWidget(df: pd.DataFrame, params: list, paramDict: dict, defau
dfCategorical = df[params[0]].astype(pd.CategoricalDtype(ordered=True, categories=params[1:]))
else:
dfCategorical = df[params[0]]
codes, optionsPlot = pd.factorize(dfCategorical, sort=True, na_sentinel=None)
codes, optionsPlot = pd.factorize(dfCategorical, sort=True)
optionsPlot = optionsPlot.dropna().to_list()
optionsPlot = [str(i) for i in optionsPlot]
default_value = 0
Expand Down Expand Up @@ -1432,7 +1432,7 @@ def makeBokehMultiSelectWidget(df: pd.DataFrame, params: list, paramDict: dict,
dfCategorical = df[params[0]].astype(pd.CategoricalDtype(ordered=True, categories=params[1:]))
else:
dfCategorical = df[params[0]]
codes, optionsPlot = pd.factorize(dfCategorical, sort=True, na_sentinel=None)
codes, optionsPlot = pd.factorize(dfCategorical, sort=True)
optionsPlot = optionsPlot.to_list()
for i, val in enumerate(optionsPlot):
optionsPlot[i] = str(val)
Expand Down Expand Up @@ -1990,4 +1990,4 @@ def makeDescriptionTable(cdsDict, cdsName, fields, meta_fields):
new_dict[i] = column
columns.append(TableColumn(field=i, title=i))
cds.data = new_dict
return DataTable(source=cds, columns=columns)
return DataTable(source=cds, columns=columns)
38 changes: 16 additions & 22 deletions RootInteractive/MLpipeline/MIForestErrPDF.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,7 @@ def _accumulate_predictionNL(predict, X, out,col):
def simple_predict(predict, X, out, col):
out[col] = predict(X, check_input=False)

def partitionBlock(allRF, k, begin, end):
allRF[begin:end].partition(k)

def blockMean(allRF, out, begin, end):
np.mean(allRF[begin:end], -1, out=out[begin:end])

def blockStd(allRF, out, begin, end):
np.std(allRF[begin:end], -1, out=out[begin:end])

def predictRFStatChunk(rf, X, statDictionary,n_jobs):
def predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs):
"""
inspired by https://github.com/scikit-learn/scikit-learn/blob/37ac6788c/sklearn/ensemble/_forest.py#L1410
predict statistics from random forest
Expand All @@ -62,7 +53,7 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs):
nEstimators = len(rf.estimators_)
allRF = np.empty((nEstimators, X.shape[0]))
statOut={}
Parallel(n_jobs=n_jobs, verbose=rf.verbose,require="sharedmem")(
parallel(
delayed(simple_predict)(e.predict, X, allRF, col)
for col,e in enumerate(rf.estimators_)
)
Expand All @@ -73,30 +64,30 @@ def predictRFStatChunk(rf, X, statDictionary,n_jobs):
block_end = block_begin[1:]
block_end.append(X.shape[0])
if "median" in statDictionary:
Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")(
delayed(partitionBlock)(allRFTranspose, nEstimators // 2, first, last)
parallel(
delayed(allRF[first:last].partition)(nEstimators // 2)
for first, last in zip(block_begin, block_end)
)
statOut["median"]= allRFTranspose[:,nEstimators//2]
if "mean" in statDictionary:
mean_out = np.empty(X.shape[0])
Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")(
delayed(blockMean)(allRFTranspose, mean_out, first, last)
parallel(
delayed(np.mean)(allRFTranspose[first:last], -1, out=mean_out[first:last])
for first, last in zip(block_begin, block_end)
)
statOut["mean"]=mean_out
if "std" in statDictionary:
std_out = np.empty(X.shape[0])
Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")(
delayed(blockStd)(allRFTranspose, std_out, first, last)
parallel(
delayed(np.std)(allRFTranspose[first:last], -1, out=std_out[first:last])
for first, last in zip(block_begin, block_end)
)
statOut["std"]=std_out
if "quantile" in statDictionary:
statOut["quantile"]={}
quantiles = np.array(statDictionary["quantile"]) * nEstimators
Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem")(
delayed(partitionBlock)(allRFTranspose, quantiles, first, last)
parallel(
delayed(allRF[first:last].partition)(quantiles)
for first, last in zip(block_begin, block_end)
)
for iQuant, quant in enumerate(statDictionary["quantile"]):
Expand All @@ -115,16 +106,19 @@ def predictRFStat(rf, X, statDictionary,n_jobs, max_rows=1000000):
:param X: input vector
:param statDictionary: dictionary of statistics to predict
:param n_jobs: number of parallel jobs for prediction
:param max_rows:
:return: dictionary with requested output statistics
"""
if(max_rows < 0):
return predictRFStatChunk(rf, X, statDictionary, n_jobs)
with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel:
return predictRFStatChunk(rf, X, statDictionary, parallel, n_jobs)
block_begin = list(range(0, X.shape[0], max_rows))
block_end = block_begin[1:]
block_end.append(X.shape[0])
answers = []
for (a,b) in zip(block_begin, block_end):
answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, n_jobs))
with Parallel(n_jobs=n_jobs, verbose=rf.verbose, require="sharedmem") as parallel:
for (a,b) in zip(block_begin, block_end):
answers.append(predictRFStatChunk(rf, X[a:b], statDictionary, parallel, n_jobs))
if not answers:
return {}
merged = {}
Expand Down