Skip to content

Commit

Permalink
Merge pull request #690 from markotoplak/dask-spectra-interruptable
Browse files Browse the repository at this point in the history
Spectra: interruptable Dask processes
  • Loading branch information
markotoplak authored Oct 10, 2023
2 parents 9a3e2fd + d8bd532 commit e47a8b2
Showing 1 changed file with 52 additions and 24 deletions.
76 changes: 52 additions & 24 deletions orangecontrib/spectroscopy/widgets/owspectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
import sys
from collections import defaultdict
import random
import time
import warnings
from xml.sax.saxutils import escape

try:
import dask
import dask.array as da
import dask.distributed
dask_client = dask.distributed.Client(processes=False, n_workers=2,
threads_per_worker=4, dashboard_address=None)
except ImportError:
dask = None

Expand Down Expand Up @@ -296,14 +300,16 @@ def show(self):
else:
color_var = master.feature_color
self.start(self.compute_averages, master.data, color_var, master.subset_indices,
master.selection_group, master.data_xsind, master.selection_type)
master.selection_group, master.selection_type)

@staticmethod
def compute_averages(data: Orange.data.Table, color_var, subset_indices,
selection_group, data_xsind, selection_type, state: TaskState):
selection_group, selection_type, state: TaskState):

def progress_interrupt(i: float):
if state.is_interruption_requested():
if future:
future.cancel()
raise InterruptException

def _split_by_color_value(data, color_var):
Expand All @@ -324,7 +330,12 @@ def _split_by_color_value(data, color_var):

results = []

future = None

is_dask = dask and isinstance(data.X, dask.array.Array)

dsplit = _split_by_color_value(data, color_var)
compute_dask = []
for colorv, indices in dsplit.items():
for part in [None, "subset", "selection"]:
progress_interrupt(0)
Expand All @@ -335,11 +346,11 @@ def _split_by_color_value(data, color_var):
elif part == "subset":
part_selection = indices & subset_indices
if np.any(part_selection):
if dask and isinstance(data.X, da.Array):
if is_dask:
subset = data.X[part_selection]
std = da.nanstd(subset, axis=0)
mean = da.nanmean(subset, axis=0)
std, mean = dask.compute(std, mean)
compute_dask.extend([da.nanstd(subset, axis=0),
da.nanmean(subset, axis=0)])
std, mean = None, None
else:
std = apply_columns_numpy(data.X,
lambda x: bottleneck.nanstd(x, axis=0),
Expand All @@ -349,9 +360,20 @@ def _split_by_color_value(data, color_var):
lambda x: bottleneck.nanmean(x, axis=0),
part_selection,
callback=progress_interrupt)
std = std[data_xsind]
mean = mean[data_xsind]
results.append((colorv, part, mean, std, part_selection))
results.append([colorv, part, mean, std, part_selection])

if is_dask:
future = dask_client.compute(dask.array.vstack(compute_dask))
while not future.done():
progress_interrupt(0)
time.sleep(0.1)
if future.cancelled():
return
computed = future.result()
for i, lr in enumerate(results):
lr[2] = computed[i*2]
lr[3] = computed[i*2+1]

progress_interrupt(0)
return results

Expand All @@ -364,6 +386,8 @@ def on_done(self, res):
x = master.data_x

for colorv, part, mean, std, part_selection in res:
std = std[master.data_xsind]
mean = mean[master.data_xsind]
if part is None:
pen = master.pen_normal if np.any(master.subset_indices) else master.pen_subset
elif part == "selection" and master.selection_type:
Expand All @@ -387,12 +411,6 @@ def on_done(self, res):
def on_partial_result(self, result):
pass

def on_exception(self, ex: Exception):
if isinstance(ex, InterruptException):
return

raise ex


class ShowIndividual(QObject, ConcurrentMixin):

Expand All @@ -413,24 +431,40 @@ def show(self):
return
sampled_indices = master._compute_sample(master.data.X)
self.start(self.compute_curves, master.data_x, master.data.X,
master.data_xsind, sampled_indices)
sampled_indices)

@staticmethod
def compute_curves(x, ys, data_xsind, sampled_indices, state: TaskState):
def compute_curves(x, ys, sampled_indices, state: TaskState):
is_dask = dask and isinstance(ys, dask.array.Array)

def progress_interrupt(i: float):
if state.is_interruption_requested():
if future:
future.cancel()
raise InterruptException

future = None

progress_interrupt(0)
ys = np.asarray(ys[sampled_indices][:, data_xsind])
ys = ys[sampled_indices]
if is_dask:
future = dask_client.compute(ys)
while not future.done():
progress_interrupt(0)
time.sleep(0.1)
if future.cancelled():
return
ys = future.result()
ys[np.isinf(ys)] = np.nan # remove infs that could ruin display

progress_interrupt(0)
return x, ys, sampled_indices

def on_done(self, res):
x, ys, sampled_indices = res

master = self.master
ys = ys[:, master.data_xsind]

if master.waterfall:
waterfall_constant = 0.1
Expand Down Expand Up @@ -473,12 +507,6 @@ def on_done(self, res):
def on_partial_result(self, result):
pass

def on_exception(self, ex: Exception):
if isinstance(ex, InterruptException):
return

raise ex


class InteractiveViewBox(ViewBox):
def __init__(self, graph):
Expand Down

0 comments on commit e47a8b2

Please sign in to comment.