Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
markotoplak committed Sep 27, 2023
1 parent f578867 commit 6e7ec1f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 21 deletions.
1 change: 1 addition & 0 deletions orangecontrib/spectroscopy/tests/test_owhyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_simple(self):

def test_unknown(self):
self.send_signal("Data", self.whitelight[:10])
print(self.whitelight[:10])
wait_for_image(self.widget)
levels = self.widget.imageplot.img.levels
self.send_signal("Data", self.whitelight_unknown[:10])
Expand Down
78 changes: 57 additions & 21 deletions orangecontrib/spectroscopy/widgets/owspectra.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
import sys
from collections import defaultdict
import random
import time
import warnings
from xml.sax.saxutils import escape

try:
import dask
import dask.array as da
import dask.distributed
dask_client = dask.distributed.Client(processes=False, n_workers=2,
threads_per_worker=4, dashboard_address=None)
except ImportError:
dask = None

Expand Down Expand Up @@ -276,7 +280,22 @@ class InterruptException(Exception):
pass


class ShowAverage(QObject, ConcurrentMixin):
class ClientConcurrentMixin(ConcurrentMixin):

def start1(self, task, *args, **kwargs):
dask_client.cancel()
super().start(task, *args, **kwargs)

def cancel1(self):
dask_client.cancel()
super().cancel()

def shutdown1(self):
dask_client.cancel()
super().shutdown()


class ShowAverage(QObject, ClientConcurrentMixin):

shown = pyqtSignal()

Expand Down Expand Up @@ -304,6 +323,8 @@ def compute_averages(data: Orange.data.Table, color_var, subset_indices,

def progress_interrupt(i: float):
if state.is_interruption_requested():
#if future:
# future.cancel()
raise InterruptException

def _split_by_color_value(data, color_var):
Expand All @@ -324,7 +345,10 @@ def _split_by_color_value(data, color_var):

results = []

is_dask = dask and isinstance(data.X, dask.array.Array)

dsplit = _split_by_color_value(data, color_var)
compute_dask = []
for colorv, indices in dsplit.items():
for part in [None, "subset", "selection"]:
progress_interrupt(0)
Expand All @@ -335,11 +359,11 @@ def _split_by_color_value(data, color_var):
elif part == "subset":
part_selection = indices & subset_indices
if np.any(part_selection):
if dask and isinstance(data.X, da.Array):
if is_dask:
subset = data.X[part_selection]
std = da.nanstd(subset, axis=0)
mean = da.nanmean(subset, axis=0)
std, mean = dask.compute(std, mean)
compute_dask.extend([da.nanstd(subset, axis=0),
da.nanmean(subset, axis=0)])
std, mean = None, None
else:
std = apply_columns_numpy(data.X,
lambda x: bottleneck.nanstd(x, axis=0),
Expand All @@ -349,7 +373,19 @@ def _split_by_color_value(data, color_var):
lambda x: bottleneck.nanmean(x, axis=0),
part_selection,
callback=progress_interrupt)
results.append((colorv, part, mean, std, part_selection))
results.append([colorv, part, mean, std, part_selection])

if is_dask:
future = dask_client.compute(dask.array.vstack(compute_dask))
while not future.done():
progress_interrupt(0)
time.sleep(0.5)
if future.cancelled():
return
computed = future.result()
for i, lr in enumerate(results):
lr[2] = computed[i*2]
lr[3] = computed[i*2+1]

progress_interrupt(0)
return results
Expand Down Expand Up @@ -388,14 +424,8 @@ def on_done(self, res):
def on_partial_result(self, result):
pass

def on_exception(self, ex: Exception):
if isinstance(ex, InterruptException):
return

raise ex


class ShowIndividual(QObject, ConcurrentMixin):
class ShowIndividual(QObject, ClientConcurrentMixin):

shown = pyqtSignal()

Expand All @@ -418,13 +448,24 @@ def show(self):

@staticmethod
def compute_curves(x, ys, sampled_indices, state: TaskState):
is_dask = dask and isinstance(ys, dask.array.Array)

def progress_interrupt(i: float):
if state.is_interruption_requested():
raise InterruptException

progress_interrupt(0)
ys = np.asarray(ys[sampled_indices])
ys = ys[sampled_indices]
if is_dask:
future = dask_client.compute(ys)
while not future.done():
progress_interrupt(0)
time.sleep(0.5)
if future.cancelled():
return
ys = future.result()
ys[np.isinf(ys)] = np.nan # remove infs that could ruin display

progress_interrupt(0)
return x, ys, sampled_indices

Expand Down Expand Up @@ -475,12 +516,6 @@ def on_done(self, res):
def on_partial_result(self, result):
pass

def on_exception(self, ex: Exception):
if isinstance(ex, InterruptException):
return

raise ex


class InteractiveViewBox(ViewBox):
def __init__(self, graph):
Expand Down Expand Up @@ -1848,5 +1883,6 @@ def migrate_context(cls, context, version):
if __name__ == "__main__": # pragma: no cover
from Orange.widgets.utils.widgetpreview import WidgetPreview
collagen = Orange.data.Table("collagen.csv")
collagen = Orange.data.Table("/home/marko/dask-ssd/brc961-br1001-orange.hdf5")
WidgetPreview(OWSpectra).run(set_data=collagen,
set_subset=collagen[:40])
set_subset=collagen[::2])

0 comments on commit 6e7ec1f

Please sign in to comment.