From 032365d11105ae2ee1efcaae451a991ec992f0a7 Mon Sep 17 00:00:00 2001 From: Marko Toplak Date: Wed, 20 Sep 2023 15:42:04 +0200 Subject: [PATCH] . --- .../spectroscopy/tests/test_owhyper.py | 1 + .../spectroscopy/widgets/owspectra.py | 80 ++++++++++++++----- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/orangecontrib/spectroscopy/tests/test_owhyper.py b/orangecontrib/spectroscopy/tests/test_owhyper.py index 84f0254f5..247c918cd 100644 --- a/orangecontrib/spectroscopy/tests/test_owhyper.py +++ b/orangecontrib/spectroscopy/tests/test_owhyper.py @@ -202,6 +202,7 @@ def test_simple(self): def test_unknown(self): self.send_signal("Data", self.whitelight[:10]) + print(self.whitelight[:10]) wait_for_image(self.widget) levels = self.widget.imageplot.img.levels self.send_signal("Data", self.whitelight_unknown[:10]) diff --git a/orangecontrib/spectroscopy/widgets/owspectra.py b/orangecontrib/spectroscopy/widgets/owspectra.py index ea2e96e17..0bc4e36ab 100644 --- a/orangecontrib/spectroscopy/widgets/owspectra.py +++ b/orangecontrib/spectroscopy/widgets/owspectra.py @@ -2,12 +2,15 @@ import sys from collections import defaultdict import random +import time import warnings from xml.sax.saxutils import escape try: import dask import dask.array as da + import dask.distributed + dask_client = dask.distributed.Client(processes=False, n_workers=2, threads_per_worker=4) except ImportError: dask = None @@ -276,7 +279,22 @@ class InterruptException(Exception): pass -class ShowAverage(QObject, ConcurrentMixin): +class ClientConcurrentMixin(ConcurrentMixin): + + def start1(self, task, *args, **kwargs): + dask_client.cancel() + super().start(task, *args, **kwargs) + + def cancel1(self): + dask_client.cancel() + super().cancel() + + def shutdown1(self): + dask_client.canvas() + super().shutdown() + + +class ShowAverage(QObject, ClientConcurrentMixin): shown = pyqtSignal() @@ -304,6 +322,9 @@ def compute_averages(data: Orange.data.Table, color_var, subset_indices, def progress_interrupt(i: float): if state.is_interruption_requested(): + if future: + future.cancel() + print("raising") raise InterruptException def _split_by_color_value(data, color_var): @@ -324,7 +345,12 @@ def _split_by_color_value(data, color_var): results = [] + future = None + + is_dask = dask and isinstance(data.X, dask.array.Array) + dsplit = _split_by_color_value(data, color_var) + compute_dask = [] for colorv, indices in dsplit.items(): for part in [None, "subset", "selection"]: progress_interrupt(0) @@ -335,11 +361,11 @@ def _split_by_color_value(data, color_var): elif part == "subset": part_selection = indices & subset_indices if np.any(part_selection): - if dask and isinstance(data.X, da.Array): + if is_dask: subset = data.X[part_selection] - std = da.nanstd(subset, axis=0) - mean = da.nanmean(subset, axis=0) - std, mean = dask.compute(std, mean) + compute_dask.extend([da.nanstd(subset, axis=0), + da.nanmean(subset, axis=0)]) + std, mean = None, None else: std = apply_columns_numpy(data.X, lambda x: bottleneck.nanstd(x, axis=0), @@ -349,7 +375,17 @@ def _split_by_color_value(data, color_var): lambda x: bottleneck.nanmean(x, axis=0), part_selection, callback=progress_interrupt) - results.append((colorv, part, mean, std, part_selection)) + results.append([colorv, part, mean, std, part_selection]) + + if is_dask: + future = dask_client.compute(dask.array.vstack(compute_dask)) + while not future.done(): + progress_interrupt(0) + time.sleep(0.5) + computed = future.result() + for i, lr in enumerate(results): + lr[2] = computed[i*2] + lr[3] = computed[i*2+1] progress_interrupt(0) return results @@ -388,14 +424,8 @@ def on_done(self, res): def on_partial_result(self, result): pass - def on_exception(self, ex: Exception): - if isinstance(ex, InterruptException): - return - raise ex - - -class ShowIndividual(QObject, ConcurrentMixin): +class ShowIndividual(QObject, ClientConcurrentMixin): shown = pyqtSignal() @@ -418,13 +448,26 @@ def show(self): @staticmethod def compute_curves(x, ys, sampled_indices, state: TaskState): + is_dask = dask and isinstance(ys, dask.array.Array) + + future = None + def progress_interrupt(i: float): if state.is_interruption_requested(): + if future: + future.cancel() raise InterruptException progress_interrupt(0) - ys = np.asarray(ys[sampled_indices]) + ys = ys[sampled_indices] + if is_dask: + future = dask_client.compute(ys) + while not future.done(): + progress_interrupt(0) + time.sleep(0.5) + ys = future.result() ys[np.isinf(ys)] = np.nan # remove infs that could ruin display + progress_interrupt(0) return x, ys, sampled_indices @@ -475,12 +518,6 @@ def on_done(self, res): def on_partial_result(self, result): pass - def on_exception(self, ex: Exception): - if isinstance(ex, InterruptException): - return - - raise ex - class InteractiveViewBox(ViewBox): def __init__(self, graph): @@ -1848,5 +1885,6 @@ def migrate_context(cls, context, version): if __name__ == "__main__": # pragma: no cover from Orange.widgets.utils.widgetpreview import WidgetPreview collagen = Orange.data.Table("collagen.csv") + collagen = Orange.data.Table("/home/marko/dask-ssd/brc961-br1001-orange.hdf5") WidgetPreview(OWSpectra).run(set_data=collagen, - set_subset=collagen[:40]) + set_subset=collagen[::2])