From 68e62e72d2c67029a555a00994b0fa83b3f66a20 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 10:47:18 +0200 Subject: [PATCH 1/9] Add module for running a workflow in streaming mode --- src/ess/reduce/streaming.py | 205 ++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 src/ess/reduce/streaming.py diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py new file mode 100644 index 00000000..fc3fe7e7 --- /dev/null +++ b/src/ess/reduce/streaming.py @@ -0,0 +1,205 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) +"""This module provides tools for running workflows in a streaming fashion.""" + +from abc import ABC, abstractmethod +from collections.abc import Callable +from typing import Any, Generic, TypeVar + +import networkx as nx +import sciline +import scipp as sc + +T = TypeVar('T') + + +def maybe_hist(value: T) -> T: + """ + Convert value to a histogram if it is not already a histogram. + + This is the default pre-processing used by accumulators. + + Parameters + ---------- + value: + Value to be converted to a histogram. + + Returns + ------- + : + Histogram. + """ + return value if value.bins is None else value.hist() + + +class Accumulator(ABC, Generic[T]): + """ + Abstract base class for accumulators. + + Accumulators are used to accumulate values over multiple chunks. + """ + + def __init__(self, preprocess: Callable[[T], T] | None = maybe_hist) -> None: + """ + Parameters + ---------- + preprocess: + Preprocessing function to be applied to pushed values prior to accumulation. + """ + self._preprocess = preprocess + + def push(self, value: T) -> None: + """ + Push a value to the accumulator. + + Parameters + ---------- + value: + Value to be pushed to the accumulator. + """ + if self._preprocess is not None: + value = self._preprocess(value) + self._do_push(value) + + @abstractmethod + def _do_push(self, value: T) -> None: ... + + @property + @abstractmethod + def value(self) -> T: + """ + Get the accumulated value. + + Returns + ------- + : + Accumulated value. + """ + + +class EternalAccumulator(Accumulator[T]): + """ + Simple accumulator that adds pushed values immediately. + + Does not support event data. + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._value: T | None = None + + @property + def value(self) -> T: + return self._value.copy() + + def _do_push(self, value: T) -> None: + if self._value is None: + self._value = value.copy() + else: + self._value += value + + +class RollingAccumulator(Accumulator[T]): + """ + Accumulator that adds pushed values to a rolling window. + + Does not support event data. + """ + + def __init__(self, window: int = 10, **kwargs: Any) -> None: + """ + Parameters + ---------- + window: + Size of the rolling window. + """ + super().__init__(**kwargs) + self._window = window + self._values: list[T] = [] + + @property + def value(self) -> T: + # Naive and potentially slow implementation if values and/or window are large! + return sc.reduce(self._values).sum() + + def _do_push(self, value: T) -> None: + self._values.append(value) + if len(self._values) > self._window: + self._values.pop(0) + + +class Streaming: + """Wrap a base workflow for streaming processing of chunks.""" + + def __init__( + self, + base_workflow: sciline.Pipeline, + dynamic_keys: tuple[sciline.typing.Key, ...], + accumulation_keys: tuple[sciline.typing.Key, ...], + target_keys: tuple[sciline.typing.Key, ...], + accumulator: type[Accumulator] = EternalAccumulator, + ) -> None: + """ + Parameters + ---------- + base_workflow: + Workflow to be used for processing chunks. + dynamic_keys: + Keys that are expected to be updated with each chunk. + accumulation_keys: + Keys for which to accumulate values. + target_keys: + Keys to be computed and returned. + accumulator: + Accumulator class to use for accumulation. + """ + workflow = sciline.Pipeline() + for key in target_keys: + workflow[key] = base_workflow[key] + for key in dynamic_keys: + workflow[key] = None # hack to prune branches + # Find static nodes as far down the graph as possible + nodes = _find_descendants(workflow, dynamic_keys) + parents = _find_parents(workflow, nodes) - _find_input_nodes(workflow) - nodes + for key, value in base_workflow.compute(parents).items(): + workflow[key] = value + self._process_chunk_workflow = workflow.copy() + self._finalize_workflow = workflow.copy() + self._accumulators = {key: accumulator() for key in accumulation_keys} + self._target_keys = target_keys + + def add_chunk( + self, chunks: dict[sciline.typing.Key, Any] + ) -> dict[sciline.typing.Key, Any]: + for key, value in chunks.items(): + self._process_chunk_workflow[key] = value + to_accumulate = self._process_chunk_workflow.compute(self._accumulators) + for key, processed in to_accumulate.items(): + self._accumulators[key].push(processed) + self._finalize_workflow[key] = self._accumulators[key].value + return self._finalize_workflow.compute(self._target_keys) + + +def _find_descendants( + workflow: sciline.Pipeline, keys: tuple[sciline.typing.Key, ...] +) -> set[sciline.typing.Key]: + graph = workflow.underlying_graph + descendants = set() + for key in keys: + descendants |= nx.descendants(graph, key) + return descendants + + +def _find_input_nodes(workflow: sciline.Pipeline) -> set[sciline.typing.Key]: + graph = workflow.underlying_graph + return {key for key in graph if graph.in_degree(key) == 0} + + +def _find_parents( + workflow: sciline.Pipeline, keys: tuple[sciline.typing.Key, ...] +) -> set[sciline.typing.Key]: + graph = workflow.underlying_graph + parents = set() + for key in keys: + parents |= set(graph.predecessors(key)) + return parents From 9f7dedbcd1d0693397d45baaf533626d5d4c9ac5 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 10:57:56 +0200 Subject: [PATCH 2/9] Comments --- src/ess/reduce/streaming.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index fc3fe7e7..fc603182 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -158,13 +158,17 @@ def __init__( workflow[key] = base_workflow[key] for key in dynamic_keys: workflow[key] = None # hack to prune branches - # Find static nodes as far down the graph as possible + + # Find and pre-compute static nodes as far down the graph as possible + # See also https://github.com/scipp/sciline/issues/148. nodes = _find_descendants(workflow, dynamic_keys) parents = _find_parents(workflow, nodes) - _find_input_nodes(workflow) - nodes for key, value in base_workflow.compute(parents).items(): workflow[key] = value + self._process_chunk_workflow = workflow.copy() self._finalize_workflow = workflow.copy() + # TODO: We may need to have a way to specify the accumulator for each key self._accumulators = {key: accumulator() for key in accumulation_keys} self._target_keys = target_keys From ced565bebacddababfc694efff76f6165e2deb97 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 11:19:46 +0200 Subject: [PATCH 3/9] Add accumulator test --- tests/streaming_test.py | 90 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/streaming_test.py diff --git a/tests/streaming_test.py b/tests/streaming_test.py new file mode 100644 index 00000000..4363c3c3 --- /dev/null +++ b/tests/streaming_test.py @@ -0,0 +1,90 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2024 Scipp contributors (https://github.com/scipp) + +import scipp as sc + +from ess.reduce import streaming + + +def test_eternal_accumulator_sums_everything(): + accum = streaming.EternalAccumulator() + var = sc.linspace(dim='x', start=0, stop=1, num=10) + for i in range(10): + accum.push(var[i].copy()) + assert sc.identical(accum.value, sc.sum(var)) + + +def test_eternal_accumulator_sums_everything_with_preprocess(): + accum = streaming.EternalAccumulator(preprocess=lambda x: x**0.5) + var = sc.linspace(dim='x', start=0, stop=1, num=10) + for i in range(10): + accum.push(var[i].copy()) + assert sc.identical(accum.value, sc.sum(var**0.5)) + + +def test_eternal_accumulator_works_if_output_value_is_modified(): + accum = streaming.EternalAccumulator() + var = sc.linspace(dim='x', start=0, stop=1, num=10) + for i in range(10): + accum.push(var[i].copy()) + value = accum.value + value += 1.0 + assert sc.identical(accum.value, sc.sum(var)) + + +def test_eternal_accumulator_does_not_modify_pushed_values(): + accum = streaming.EternalAccumulator() + var = sc.linspace(dim='x', start=0, stop=1, num=10) + original = var.copy() + for i in range(10): + accum.push(var[i]) + assert sc.identical(var, original) + + +def test_rolling_accumulator_sums_over_window(): + accum = streaming.RollingAccumulator(window=3) + var = sc.linspace(dim='x', start=0, stop=1, num=10) + accum.push(var[0].copy()) + assert sc.identical(accum.value, var[0]) + accum.push(var[1].copy()) + assert sc.identical(accum.value, var[0:2].sum()) + accum.push(var[2].copy()) + assert sc.identical(accum.value, var[0:3].sum()) + accum.push(var[3].copy()) + assert sc.identical(accum.value, var[1:4].sum()) + accum.push(var[4].copy()) + assert sc.identical(accum.value, var[2:5].sum()) + + +def test_rolling_accumulator_sums_over_window_with_preprocess(): + accum = streaming.RollingAccumulator(window=3, preprocess=lambda x: x**0.5) + var = sc.linspace(dim='x', start=0, stop=1, num=10) + accum.push(var[0].copy()) + assert sc.identical(accum.value, var[0] ** 0.5) + accum.push(var[1].copy()) + assert sc.identical(accum.value, (var[0:2] ** 0.5).sum()) + accum.push(var[2].copy()) + assert sc.identical(accum.value, (var[0:3] ** 0.5).sum()) + accum.push(var[3].copy()) + assert sc.identical(accum.value, (var[1:4] ** 0.5).sum()) + accum.push(var[4].copy()) + assert sc.identical(accum.value, (var[2:5] ** 0.5).sum()) + + +def test_rolling_accumulator_works_if_output_value_is_modified(): + accum = streaming.RollingAccumulator(window=3) + var = sc.linspace(dim='x', start=0, stop=1, num=10) + for i in range(10): + accum.push(var[i].copy()) + value = accum.value + value += 1.0 + assert sc.identical(accum.value, var[7:10].sum()) + + +def test_rolling_accumulator_does_not_modify_pushed_values(): + accum = streaming.RollingAccumulator(window=3) + var = sc.linspace(dim='x', start=0, stop=1, num=10) + original = var.copy() + for i in range(10): + accum.push(var[i]) + assert sc.identical(var, original) From f947017b1fa224fb194d7c25a7fe50f264e0f5b4 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 11:42:58 +0200 Subject: [PATCH 4/9] Add test of class Streaming --- tests/streaming_test.py | 73 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 8 deletions(-) diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 4363c3c3..9eb943e1 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -1,12 +1,15 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright (c) 2024 Scipp contributors (https://github.com/scipp) +from typing import NewType + +import sciline import scipp as sc from ess.reduce import streaming -def test_eternal_accumulator_sums_everything(): +def test_eternal_accumulator_sums_everything() -> None: accum = streaming.EternalAccumulator() var = sc.linspace(dim='x', start=0, stop=1, num=10) for i in range(10): @@ -14,7 +17,7 @@ def test_eternal_accumulator_sums_everything(): assert sc.identical(accum.value, sc.sum(var)) -def test_eternal_accumulator_sums_everything_with_preprocess(): +def test_eternal_accumulator_sums_everything_with_preprocess() -> None: accum = streaming.EternalAccumulator(preprocess=lambda x: x**0.5) var = sc.linspace(dim='x', start=0, stop=1, num=10) for i in range(10): @@ -22,7 +25,7 @@ def test_eternal_accumulator_sums_everything_with_preprocess(): assert sc.identical(accum.value, sc.sum(var**0.5)) -def test_eternal_accumulator_works_if_output_value_is_modified(): +def test_eternal_accumulator_works_if_output_value_is_modified() -> None: accum = streaming.EternalAccumulator() var = sc.linspace(dim='x', start=0, stop=1, num=10) for i in range(10): @@ -32,7 +35,7 @@ def test_eternal_accumulator_works_if_output_value_is_modified(): assert sc.identical(accum.value, sc.sum(var)) -def test_eternal_accumulator_does_not_modify_pushed_values(): +def test_eternal_accumulator_does_not_modify_pushed_values() -> None: accum = streaming.EternalAccumulator() var = sc.linspace(dim='x', start=0, stop=1, num=10) original = var.copy() @@ -41,7 +44,7 @@ def test_eternal_accumulator_does_not_modify_pushed_values(): assert sc.identical(var, original) -def test_rolling_accumulator_sums_over_window(): +def test_rolling_accumulator_sums_over_window() -> None: accum = streaming.RollingAccumulator(window=3) var = sc.linspace(dim='x', start=0, stop=1, num=10) accum.push(var[0].copy()) @@ -56,7 +59,7 @@ def test_rolling_accumulator_sums_over_window(): assert sc.identical(accum.value, var[2:5].sum()) -def test_rolling_accumulator_sums_over_window_with_preprocess(): +def test_rolling_accumulator_sums_over_window_with_preprocess() -> None: accum = streaming.RollingAccumulator(window=3, preprocess=lambda x: x**0.5) var = sc.linspace(dim='x', start=0, stop=1, num=10) accum.push(var[0].copy()) @@ -71,7 +74,7 @@ def test_rolling_accumulator_sums_over_window_with_preprocess(): assert sc.identical(accum.value, (var[2:5] ** 0.5).sum()) -def test_rolling_accumulator_works_if_output_value_is_modified(): +def test_rolling_accumulator_works_if_output_value_is_modified() -> None: accum = streaming.RollingAccumulator(window=3) var = sc.linspace(dim='x', start=0, stop=1, num=10) for i in range(10): @@ -81,10 +84,64 @@ def test_rolling_accumulator_works_if_output_value_is_modified(): assert sc.identical(accum.value, var[7:10].sum()) -def test_rolling_accumulator_does_not_modify_pushed_values(): +def test_rolling_accumulator_does_not_modify_pushed_values() -> None: accum = streaming.RollingAccumulator(window=3) var = sc.linspace(dim='x', start=0, stop=1, num=10) original = var.copy() for i in range(10): accum.push(var[i]) assert sc.identical(var, original) + + +DynamicA = NewType('DynamicA', float) +DynamicB = NewType('DynamicB', float) +StaticA = NewType('StaticA', float) +AccumA = NewType('AccumA', float) +AccumB = NewType('AccumB', float) +Target = NewType('Target', float) + + +def make_static_a() -> StaticA: + make_static_a.call_count += 1 + return StaticA(2.0) + + +make_static_a.call_count = 0 + + +def make_accum_a(value: DynamicA, static: StaticA) -> AccumA: + return AccumA(value * static) + + +def make_accum_b(value: DynamicB) -> AccumB: + return AccumB(value) + + +def make_target(accum_a: AccumA, accum_b: AccumB) -> Target: + return Target(accum_a / accum_b) + + +def test_streaming() -> None: + base_workflow = sciline.Pipeline( + (make_static_a, make_accum_a, make_accum_b, make_target) + ) + + streaming_wf = streaming.Streaming( + base_workflow=base_workflow, + dynamic_keys=(DynamicA, DynamicB), + accumulation_keys=(AccumA, AccumB), + target_keys=(Target,), + ) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) + assert sc.identical(result[Target], sc.scalar(2 * 1.0 / 4.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(2), DynamicB: sc.scalar(5)}) + assert sc.identical(result[Target], sc.scalar(2 * 3.0 / 9.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(3), DynamicB: sc.scalar(6)}) + assert sc.identical(result[Target], sc.scalar(2 * 6.0 / 15.0)) + assert make_static_a.call_count == 1 + + wf = base_workflow.copy() + wf[DynamicA] = sc.scalar(1 + 2 + 3) + wf[DynamicB] = sc.scalar(4 + 5 + 6) + expected = wf.compute(Target) + assert sc.identical(expected, sc.scalar(2 * 6.0 / 15.0)) From b7e4e1b06e1938dbf598a72599094eee193c2503 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 11:48:25 +0200 Subject: [PATCH 5/9] Remove removal of source nodes since it may be expensive calls --- src/ess/reduce/streaming.py | 9 ++------- tests/streaming_test.py | 11 ++++++----- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index fc603182..aec71b1d 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -162,7 +162,7 @@ def __init__( # Find and pre-compute static nodes as far down the graph as possible # See also https://github.com/scipp/sciline/issues/148. nodes = _find_descendants(workflow, dynamic_keys) - parents = _find_parents(workflow, nodes) - _find_input_nodes(workflow) - nodes + parents = _find_parents(workflow, nodes) - nodes for key, value in base_workflow.compute(parents).items(): workflow[key] = value @@ -191,12 +191,7 @@ def _find_descendants( descendants = set() for key in keys: descendants |= nx.descendants(graph, key) - return descendants - - -def _find_input_nodes(workflow: sciline.Pipeline) -> set[sciline.typing.Key]: - graph = workflow.underlying_graph - return {key for key in graph if graph.in_degree(key) == 0} + return descendants | set(keys) def _find_parents( diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 9eb943e1..f7533657 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -125,6 +125,7 @@ def test_streaming() -> None: base_workflow = sciline.Pipeline( (make_static_a, make_accum_a, make_accum_b, make_target) ) + orig_workflow = base_workflow.copy() streaming_wf = streaming.Streaming( base_workflow=base_workflow, @@ -140,8 +141,8 @@ def test_streaming() -> None: assert sc.identical(result[Target], sc.scalar(2 * 6.0 / 15.0)) assert make_static_a.call_count == 1 - wf = base_workflow.copy() - wf[DynamicA] = sc.scalar(1 + 2 + 3) - wf[DynamicB] = sc.scalar(4 + 5 + 6) - expected = wf.compute(Target) - assert sc.identical(expected, sc.scalar(2 * 6.0 / 15.0)) + # Consistency check: Run the original workflow with the same inputs, all at once + orig_workflow[DynamicA] = sc.scalar(1 + 2 + 3) + orig_workflow[DynamicB] = sc.scalar(4 + 5 + 6) + expected = orig_workflow.compute(Target) + assert sc.identical(expected, result[Target]) From 94e6980ca8110d52ba50c315e6b7ab4eec2f30f1 Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Thu, 29 Aug 2024 11:54:22 +0200 Subject: [PATCH 6/9] Rename and docs --- src/ess/reduce/streaming.py | 13 +++++++++++-- tests/streaming_test.py | 4 ++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index aec71b1d..d7c92bd7 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -128,8 +128,15 @@ def _do_push(self, value: T) -> None: self._values.pop(0) -class Streaming: - """Wrap a base workflow for streaming processing of chunks.""" +class StreamProcessor: + """ + Wrap a base workflow for streaming processing of chunks. + + Note that this class can not determine if the workflow is valid for streamed + processing based on the input keys. In particular, it is the responsibility of the + user to ensure that the workflow is "linear" with respect to the dynamic keys up to + the accumulation keys. + """ def __init__( self, @@ -140,6 +147,8 @@ def __init__( accumulator: type[Accumulator] = EternalAccumulator, ) -> None: """ + Create a stream processor. + Parameters ---------- base_workflow: diff --git a/tests/streaming_test.py b/tests/streaming_test.py index f7533657..edb40f51 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -121,13 +121,13 @@ def make_target(accum_a: AccumA, accum_b: AccumB) -> Target: return Target(accum_a / accum_b) -def test_streaming() -> None: +def test_StreamProcessor() -> None: base_workflow = sciline.Pipeline( (make_static_a, make_accum_a, make_accum_b, make_target) ) orig_workflow = base_workflow.copy() - streaming_wf = streaming.Streaming( + streaming_wf = streaming.StreamProcessor( base_workflow=base_workflow, dynamic_keys=(DynamicA, DynamicB), accumulation_keys=(AccumA, AccumB), From 2b330665321029039ad5122342d4b3e98cc5e31d Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 30 Aug 2024 09:05:39 +0200 Subject: [PATCH 7/9] Use dict of accumulators --- src/ess/reduce/streaming.py | 20 ++++++++++++-------- tests/streaming_test.py | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index d7c92bd7..71a56568 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -141,10 +141,11 @@ class StreamProcessor: def __init__( self, base_workflow: sciline.Pipeline, + *, dynamic_keys: tuple[sciline.typing.Key, ...], - accumulation_keys: tuple[sciline.typing.Key, ...], target_keys: tuple[sciline.typing.Key, ...], - accumulator: type[Accumulator] = EternalAccumulator, + accumulators: dict[sciline.typing.Key, Accumulator] + | tuple[sciline.typing.Key, ...], ) -> None: """ Create a stream processor. @@ -155,12 +156,12 @@ def __init__( Workflow to be used for processing chunks. dynamic_keys: Keys that are expected to be updated with each chunk. - accumulation_keys: - Keys for which to accumulate values. target_keys: Keys to be computed and returned. - accumulator: - Accumulator class to use for accumulation. + accumulators: + Keys at which to accumulate values and their accumulators. If a tuple is + passed, :py:class:`EternalAccumulator` is used for all keys. Otherwise, a + dict mapping keys to accumulator instances can be passed. """ workflow = sciline.Pipeline() for key in target_keys: @@ -177,8 +178,11 @@ def __init__( self._process_chunk_workflow = workflow.copy() self._finalize_workflow = workflow.copy() - # TODO: We may need to have a way to specify the accumulator for each key - self._accumulators = {key: accumulator() for key in accumulation_keys} + self._accumulators = ( + accumulators + if isinstance(accumulators, dict) + else {key: EternalAccumulator() for key in accumulators} + ) self._target_keys = target_keys def add_chunk( diff --git a/tests/streaming_test.py b/tests/streaming_test.py index edb40f51..40554cd0 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -121,7 +121,7 @@ def make_target(accum_a: AccumA, accum_b: AccumB) -> Target: return Target(accum_a / accum_b) -def test_StreamProcessor() -> None: +def test_StreamProcessor_overall_behavior() -> None: base_workflow = sciline.Pipeline( (make_static_a, make_accum_a, make_accum_b, make_target) ) @@ -130,8 +130,8 @@ def test_StreamProcessor() -> None: streaming_wf = streaming.StreamProcessor( base_workflow=base_workflow, dynamic_keys=(DynamicA, DynamicB), - accumulation_keys=(AccumA, AccumB), target_keys=(Target,), + accumulators=(AccumA, AccumB), ) result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) assert sc.identical(result[Target], sc.scalar(2 * 1.0 / 4.0)) @@ -146,3 +146,33 @@ def test_StreamProcessor() -> None: orig_workflow[DynamicB] = sc.scalar(4 + 5 + 6) expected = orig_workflow.compute(Target) assert sc.identical(expected, result[Target]) + + +def test_StreamProcessor_uses_custom_accumulator() -> None: + class Always42Accumulator(streaming.Accumulator[sc.Variable]): + def _do_push(self, value: sc.Variable) -> None: + pass + + @property + def value(self) -> sc.Variable: + return sc.scalar(42) + + base_workflow = sciline.Pipeline( + (make_static_a, make_accum_a, make_accum_b, make_target) + ) + + streaming_wf = streaming.StreamProcessor( + base_workflow=base_workflow, + dynamic_keys=(DynamicA, DynamicB), + target_keys=(Target,), + accumulators={ + AccumA: streaming.EternalAccumulator(), + AccumB: Always42Accumulator(), + }, + ) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) + assert sc.identical(result[Target], sc.scalar(2 * 1.0 / 42.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(2), DynamicB: sc.scalar(5)}) + assert sc.identical(result[Target], sc.scalar(2 * 3.0 / 42.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(3), DynamicB: sc.scalar(6)}) + assert sc.identical(result[Target], sc.scalar(2 * 6.0 / 42.0)) From c7b729bbdbb4986b4d501cfba0c129c29b36b20a Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 30 Aug 2024 09:23:45 +0200 Subject: [PATCH 8/9] Do not compute unreachable intermediates --- src/ess/reduce/streaming.py | 7 +++++++ tests/streaming_test.py | 25 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index 71a56568..4e653741 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -183,6 +183,13 @@ def __init__( if isinstance(accumulators, dict) else {key: EternalAccumulator() for key in accumulators} ) + # Depending on the target_keys, some accumulators can be unused and should not + # be computed when adding a chunk. + self._accumulators = { + key: value + for key, value in self._accumulators.items() + if key in self._process_chunk_workflow.underlying_graph + } self._target_keys = target_keys def add_chunk( diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 40554cd0..5c8e06c8 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -176,3 +176,28 @@ def value(self) -> sc.Variable: assert sc.identical(result[Target], sc.scalar(2 * 3.0 / 42.0)) result = streaming_wf.add_chunk({DynamicA: sc.scalar(3), DynamicB: sc.scalar(6)}) assert sc.identical(result[Target], sc.scalar(2 * 6.0 / 42.0)) + + +def test_StreamProcessor_does_not_compute_unused_static_nodes() -> None: + def a_independent_target(accum_b: AccumB) -> Target: + return Target(1.5 * accum_b) + + def derived_static_a(x: float) -> StaticA: + derived_static_a.call_count += 1 + return StaticA(2.0 * x) + + derived_static_a.call_count = 0 + + base_workflow = sciline.Pipeline( + (derived_static_a, make_accum_a, make_accum_b, a_independent_target) + ) + + streaming_wf = streaming.StreamProcessor( + base_workflow=base_workflow, + dynamic_keys=(DynamicA, DynamicB), + target_keys=(Target,), + accumulators=(AccumA, AccumB), + ) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) + assert sc.identical(result[Target], sc.scalar(1.5 * 4.0)) + assert derived_static_a.call_count == 0 From a1ea4082a49af6c4d470b4ba49ffcccbe834680a Mon Sep 17 00:00:00 2001 From: Simon Heybrock Date: Fri, 30 Aug 2024 10:44:10 +0200 Subject: [PATCH 9/9] Fix for dynamic keys that bypass accumulators --- src/ess/reduce/streaming.py | 4 ++++ tests/streaming_test.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/src/ess/reduce/streaming.py b/src/ess/reduce/streaming.py index 4e653741..6a7d5a46 100644 --- a/src/ess/reduce/streaming.py +++ b/src/ess/reduce/streaming.py @@ -197,6 +197,10 @@ def add_chunk( ) -> dict[sciline.typing.Key, Any]: for key, value in chunks.items(): self._process_chunk_workflow[key] = value + # There can be dynamic keys that do not "terminate" in any accumulator. In + # that case, we need to make sure they can be and are used when computing + # the target keys. + self._finalize_workflow[key] = value to_accumulate = self._process_chunk_workflow.compute(self._accumulators) for key, processed in to_accumulate.items(): self._accumulators[key].push(processed) diff --git a/tests/streaming_test.py b/tests/streaming_test.py index 5c8e06c8..a4739149 100644 --- a/tests/streaming_test.py +++ b/tests/streaming_test.py @@ -201,3 +201,24 @@ def derived_static_a(x: float) -> StaticA: result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) assert sc.identical(result[Target], sc.scalar(1.5 * 4.0)) assert derived_static_a.call_count == 0 + + +def test_StreamProcess_with_zero_accumulators_for_buffered_workflow_calls() -> None: + base_workflow = sciline.Pipeline( + (make_static_a, make_accum_a, make_accum_b, make_target) + ) + make_static_a.call_count = 0 + + streaming_wf = streaming.StreamProcessor( + base_workflow=base_workflow, + dynamic_keys=(DynamicA, DynamicB), + target_keys=(Target,), + accumulators=(), + ) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(1), DynamicB: sc.scalar(4)}) + assert sc.identical(result[Target], sc.scalar(2 * 1.0 / 4.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(2), DynamicB: sc.scalar(5)}) + assert sc.identical(result[Target], sc.scalar(2 * 2.0 / 5.0)) + result = streaming_wf.add_chunk({DynamicA: sc.scalar(3), DynamicB: sc.scalar(6)}) + assert sc.identical(result[Target], sc.scalar(2 * 3.0 / 6.0)) + assert make_static_a.call_count == 1