From 59effaee849a651172f091a0b4eedaa5d2c6dde3 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 14 Aug 2020 14:15:01 +0300 Subject: [PATCH 01/16] Add BNC marker handling. --- .../pyDKB/dataflow/communication/producer/Producer.py | 4 ++++ .../pyDKB/dataflow/communication/stream/OutputStream.py | 5 +++++ Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py | 9 +++++++++ Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py | 4 ++++ 4 files changed, 22 insertions(+) diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py index d953cf070..7bd361d97 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/producer/Producer.py @@ -102,6 +102,10 @@ def eop(self): """ Write EOP marker to the current dest. """ self.get_stream().eop() + def bnc(self): + """ Write BNC marker to the current dest. """ + self.get_stream().bnc() + def flush(self): """ Flush buffered messages to the current dest. """ self.get_stream().flush() diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py index c0b6ee56d..642c721ac 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/OutputStream.py @@ -15,6 +15,7 @@ def configure(self, config={}): """ Configure instance. """ super(OutputStream, self).configure(config) self.EOP = config.get('eop', '') + self.BNC = config.get('bnc', '') def write(self, message): """ Add message to the buffer. """ @@ -40,6 +41,10 @@ def eop(self): """ Signalize Supervisor about end of process. """ self.get_fd().write(self.EOP) + def bnc(self): + """ Signalize Supervisor about batch being incomplete. """ + self.get_fd().write(self.BNC) + def drop(self): """ Drop buffer without sending messages anywhere. """ self.msg_buffer = [] diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 1a90c2adf..6bac20aaf 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -132,6 +132,12 @@ def defaultArguments(self): default=None, dest='eop' ) + self.add_argument('--batch-not-complete', action='store', type=str, + help=u'custom batch-not-complete marker\n' + 'DEFAULT: \'\'', + default=None, + dest='bnc' + ) def _is_flag_option(self, **kwargs): """ Check if added argument is a flag option. """ @@ -224,6 +230,9 @@ def parse_args(self, args): "Case: %s" % (err), logLevel.ERROR) sys.exit(1) + if self.ARGS.bnc is None: + self.ARGS.bnc = '' + if self.ARGS.mode == 'm': if 'f' in (self.ARGS.source, self.ARGS.dest): self.log("File source/destination is not allowed " diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 688721b96..32446a783 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -363,6 +363,10 @@ def forward(self): """ Send EOPMarker to the output stream. """ self.__output.eop() + def bnc(self): + """ Send BNCMarker to the output stream. """ + self.__output.bnc() + def flush_buffer(self): """ Flush message buffer to the output. """ self.__output.flush() From 43af39f77f82eb6244ab91178d245948e9f2ffa7 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 14 Aug 2020 17:04:56 +0300 Subject: [PATCH 02/16] Add an option to set batch size for a stage. --- .../Dataflow/pyDKB/dataflow/stage/ProcessorStage.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 32446a783..9b415cf1e 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -67,6 +67,9 @@ class ProcessorStage(AbstractStage): * List of objects to be "stopped" __stoppable + + * The stage will try to process messages in batches of this size. + _batch_size """ __input_message_type = None @@ -92,6 +95,7 @@ def __init__(self, description="DKB Dataflow data processing stage."): * ... """ self.__stoppable = [] + self._batch_size = 1 super(ProcessorStage, self).__init__(description) def set_input_message_type(self, Type=None): @@ -202,6 +206,14 @@ def set_default_arguments(self, ignore_on_skip=False, **kwargs): if ignore_on_skip: self._reset_on_skip += kwargs.keys() + def set_batch_size(self, size): + """ Set batch size. + + :param size: size + :type size: int + """ + self._batch_size = size + def configure(self, args=None): """ Configure stage according to the config parameters. From dc2b5c57093264c7ae51423a5cc064571ea127e0 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Sat, 15 Aug 2020 19:18:06 +0300 Subject: [PATCH 03/16] Add input markers handling. Add the first input marker: End-Of-Batch. --- .../Dataflow/pyDKB/common/custom_readline.py | 41 ++++++++++++++++++- .../communication/consumer/Consumer.py | 3 +- .../communication/stream/InputStream.py | 32 ++++++++++++--- .../pyDKB/dataflow/stage/AbstractStage.py | 9 ++++ .../pyDKB/dataflow/stage/ProcessorStage.py | 3 ++ 5 files changed, 79 insertions(+), 9 deletions(-) diff --git a/Utils/Dataflow/pyDKB/common/custom_readline.py b/Utils/Dataflow/pyDKB/common/custom_readline.py index 7443fabb5..1a4737ca6 100644 --- a/Utils/Dataflow/pyDKB/common/custom_readline.py +++ b/Utils/Dataflow/pyDKB/common/custom_readline.py @@ -10,12 +10,20 @@ import fcntl -def custom_readline(f, newline): +def custom_readline(f, newline, markers={}): """ Read lines with custom line separator. Construct generator with readline-like functionality: with every call of ``next()`` method it will read data from ``f`` - untill the ``newline`` separator is found; then yields what was read. + until the ``newline`` separator is found; then yields what was read. + If ``markers`` are supplied, then check for their presence: markers + are special strings that can occur: + - At the beginning of ``f``. + - Immediately after a ``newline``. + - Immediately after another marker. + If a marker's value is found in such place, its name is yielded instead + of another chunk of text and the value is removed. Markers in other + places are ignored. .. warning:: the last line can be incomplete, if the input data flow is interrupted in the middle of data writing. @@ -28,6 +36,8 @@ def custom_readline(f, newline): :type f: file :param newline: delimeter to be used instead of ``\\n`` :type newline: str + :param markers: markers to look for, {name:value} + :type markers: dict :return: iterable object :rtype: generator @@ -52,6 +62,19 @@ def custom_readline(f, newline): chunk = f.read() if not chunk: if buf: + if markers: + # Look for markers after last newline. + look_for_markers = True + while look_for_markers: + look_for_markers = False + for name, value in markers.iteritems(): + if buf.startswith(value): + buf = buf[len(value):] + look_for_markers = True + while send_not_next: + send_not_next = yield True + send_not_next = yield name + break while send_not_next: # If we are here, the source is not empty for sure: # we have another message to yield @@ -64,6 +87,20 @@ def custom_readline(f, newline): # and (in theory) may provide another message sooner or later send_not_next = yield True while newline in buf: + if markers: + # Look for markers before each yielded "line". + # This includes start of f. + look_for_markers = True + while look_for_markers: + look_for_markers = False + for name, value in markers.iteritems(): + if buf.startswith(value): + buf = buf[len(value):] + look_for_markers = True + while send_not_next: + send_not_next = yield True + send_not_next = yield name + break pos = buf.index(newline) + len(newline) while send_not_next: # If we are here, the source is not empty for sure: diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py index 20ef4bdd7..cfa595e5d 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/consumer/Consumer.py @@ -100,10 +100,11 @@ def get_source_info(self): raise NotImplementedError def get_message(self): - """ Get new message from current source. + """ Get new message or marker from current source. Return values: Message object + marker (str) False (failed to parse message) None (all input sources are empty) """ diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index 086ff678f..50db8bb5e 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -16,6 +16,19 @@ class InputStream(Stream): __iterator = None + # Names of markers that the stream knows how to process. + # Values are taken from config, markers with empty values are ignored. + marker_names = ['eob'] + + def configure(self, config={}): + """ Configure instance. """ + super(InputStream, self).configure(config) + self.markers = {} + for name in self.marker_names: + value = config.get(name) + if value: + self.markers[name] = value + def __iter__(self): """ Initialize iteration. """ self._reset_iterator() @@ -24,14 +37,14 @@ def __iter__(self): def _reset_iterator(self): """ Reset inner iterator on a new file descriptor. """ fd = self.get_fd() - if self.EOM == '\n': + if self.EOM == '\n' and not self.markers: self.__iterator = iter(fd.readline, "") self.is_readable = self._fd_is_readable - elif self.EOM == '': + elif self.EOM == '' and not self.markers: self.__iterator = iter(fd.read, "") self.is_readable = self._fd_is_readable else: - self.__iterator = custom_readline(fd, self.EOM) + self.__iterator = custom_readline(fd, self.EOM, self.markers) self.is_readable = self._gi_is_readable def reset(self, fd, close=True, force=False): @@ -139,9 +152,10 @@ def parse_message(self, message): return False def get_message(self): - """ Get next message from the input stream. + """ Get next message or marker from the input stream. :returns: parsed next message, + next marker, False -- parsing failed, None -- no messages left :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, @@ -154,16 +168,22 @@ def get_message(self): return result def next(self): - """ Get next message from the input stream. + """ Get next message or marker from the input stream. :returns: parsed next message, + next marker, False -- parsing failed or unexpected end of stream occurred - :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, bool + :rtype: pyDKB.dataflow.communication.messages.AbstractMessage, + str, bool """ if not self.__iterator: self._reset_iterator() msg = self.__iterator.next() if not msg.endswith(self.EOM): + # Check whether an expected marker was received. + for key in self.markers: + if msg == key: + return key log_msg = msg[:10] + '<...>' * (len(msg) > 20) log_msg += msg[-min(len(msg) - 10, 10):] log_msg = log_msg.replace('\n', r'\n') diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 6bac20aaf..3574b8180 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -138,6 +138,12 @@ def defaultArguments(self): default=None, dest='bnc' ) + self.add_argument('--end-of-batch', action='store', type=str, + help=u'custom end-of-batch marker\n' + 'DEFAULT: \'\'', + default=None, + dest='eob' + ) def _is_flag_option(self, **kwargs): """ Check if added argument is a flag option. """ @@ -233,6 +239,9 @@ def parse_args(self, args): if self.ARGS.bnc is None: self.ARGS.bnc = '' + if self.ARGS.eob is None: + self.ARGS.eob = '' + if self.ARGS.mode == 'm': if 'f' in (self.ARGS.source, self.ARGS.dest): self.log("File source/destination is not allowed " diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 9b415cf1e..117b5be7c 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -264,6 +264,9 @@ def run(self): err = None try: for msg in self.input(): + if type(msg) == str: + # Normal processing mode expects no markers. + continue if msg and process(self, msg): self.flush_buffer() else: From 129e5a0cdf3f51c2d43225f034d5671a41a6ec94 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Thu, 20 Aug 2020 20:07:03 +0300 Subject: [PATCH 04/16] Set BNC to be non-empty by default. Default arguments are supposed to make things work as expected - while using empty BNC would do so in nearby future, when there would be batch processing but no supervisor, it would not be so when the latter would be present as well. Therefore, it is set to be non-empty by default: - Stages without batch processing won't be affected because they won't be sending BNC. - Stages with batch processing will set it to be empty in their default values. --- Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 3574b8180..70f9fd116 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -134,7 +134,7 @@ def defaultArguments(self): ) self.add_argument('--batch-not-complete', action='store', type=str, help=u'custom batch-not-complete marker\n' - 'DEFAULT: \'\'', + 'DEFAULT: \'BNC\'', default=None, dest='bnc' ) @@ -237,7 +237,7 @@ def parse_args(self, args): sys.exit(1) if self.ARGS.bnc is None: - self.ARGS.bnc = '' + self.ARGS.bnc = 'BNC' if self.ARGS.eob is None: self.ARGS.eob = '' From ee4f3eb104a43c2aff6245a9a043d3b055d1e450 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 21 Aug 2020 16:49:30 +0300 Subject: [PATCH 05/16] Stop processing on receiving unexpected marker. --- Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 117b5be7c..bb055385e 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -266,7 +266,8 @@ def run(self): for msg in self.input(): if type(msg) == str: # Normal processing mode expects no markers. - continue + raise DataflowException("Unexpected marker" + " received: %s." % msg) if msg and process(self, msg): self.flush_buffer() else: From 1e66a77aa63d2299492097dd9a38b22b8b3817cb Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 21 Aug 2020 18:00:28 +0300 Subject: [PATCH 06/16] Add checks for batch size type and value. --- Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index bb055385e..99ac56e78 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -212,6 +212,15 @@ def set_batch_size(self, size): :param size: size :type size: int """ + if type(size) != int: + self.log("Cannot set batch size to %s: non-integer value." % size, + logLevel.WARN) + return False + if size < 1: + self.log("Cannot set batch size to %d: value must" + " be positive." % size, + logLevel.WARN) + return False self._batch_size = size def configure(self, args=None): From e9a959e81b0facf6978973d786aaab21dc7f79bb Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 21 Aug 2020 18:16:11 +0300 Subject: [PATCH 07/16] Move marker handling to input. The idea is to keep run() simple and independent from batch/not batch/skip/whatever mode. --- Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index 99ac56e78..f17f29093 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -273,10 +273,6 @@ def run(self): err = None try: for msg in self.input(): - if type(msg) == str: - # Normal processing mode expects no markers. - raise DataflowException("Unexpected marker" - " received: %s." % msg) if msg and process(self, msg): self.flush_buffer() else: @@ -378,6 +374,10 @@ def input(self): Every iteration returns single input message to be processed. """ for r in self.__input: + if type(r) == str: + # Normal processing mode expects no markers. + raise DataflowException("Unexpected marker" + " received: %s." % r) yield r def output(self, message): From 50c79c7fe25decab3ab82bf98bc0d48aeac580fd Mon Sep 17 00:00:00 2001 From: Evildoor Date: Fri, 21 Aug 2020 19:39:02 +0300 Subject: [PATCH 08/16] Add batch processing. --- .../pyDKB/dataflow/stage/ProcessorStage.py | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py index f17f29093..e4dc398ab 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/ProcessorStage.py @@ -373,12 +373,36 @@ def input(self): Returns iterable object. Every iteration returns single input message to be processed. """ - for r in self.__input: - if type(r) == str: - # Normal processing mode expects no markers. - raise DataflowException("Unexpected marker" - " received: %s." % r) - yield r + if self._batch_size == 1 or self.ARGS.skip_process: + for r in self.__input: + if type(r) == str: + # Normal processing mode expects no markers. + raise DataflowException("Unexpected marker" + " received: %s." % r) + yield r + else: + batch = [] + for r in self.__input: + if type(r) != str: + # Message was received. + if r: + batch.append(r) + if len(batch) == self._batch_size: + yield batch + batch = [] + else: + self.bnc() + else: + # Marker was received. + if r == 'eob': + yield batch + batch = [] + else: + raise DataflowException("Unexpected marker" + " received: %s." % r) + if batch: + # There is no more input, but there is an unfinished batch. + yield batch def output(self, message): """ Put the (list of) message(s) to the output buffer. """ From f9bfcdd51921e1121cf8d9421077655f2030c33c Mon Sep 17 00:00:00 2001 From: Evildoor Date: Tue, 26 May 2020 20:00:52 +0300 Subject: [PATCH 09/16] Add a simple stage for batch proc development. --- Utils/Dataflow/data4es/batch_stage/stage.py | 76 +++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100755 Utils/Dataflow/data4es/batch_stage/stage.py diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py new file mode 100755 index 000000000..4be27f104 --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -0,0 +1,76 @@ +#!/bin/env python +""" +DKB Dataflow stage XXX (StageName). + +Stage short description + +Authors: + Author Name (author@cern.ch) +""" + +import os +import sys +import traceback + +try: + base_dir = os.path.dirname(__file__) + dkb_dir = os.path.join(base_dir, os.pardir) + sys.path.append(dkb_dir) + import pyDKB + from pyDKB.dataflow.stage import ProcessorStage + from pyDKB.dataflow.communication.messages import JSONMessage + from pyDKB.dataflow.exceptions import DataflowException + from pyDKB.dataflow import messageType + from pyDKB.common.types import logLevel +except Exception, err: + sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err) + sys.exit(1) + + +def process(stage, message): + """ Single message processing. """ + data = message.content() + # Processing machinery + if 'df' in data and isinstance(data['df'], (str, unicode)): + data['df'] = 'processed ' + data['df'] + else: + stage.log("Failed to process data %s, required field 'df' not found" + " or contains non-str value." % data, logLevel.WARN) + out_message = JSONMessage(data) + stage.output(out_message) + return True + + +def main(args): + """ Program body. """ + stage = ProcessorStage() + stage.set_input_message_type(messageType.JSON) + stage.set_output_message_type(messageType.JSON) + + stage.process = process + + exit_code = 0 + exc_info = None + try: + stage.configure(args) + stage.run() + except (DataflowException, RuntimeError), err: + if str(err): + sys.stderr.write("(ERROR) %s\n" % err) + exit_code = 2 + except Exception: + exc_info = sys.exc_info() + exit_code = 3 + finally: + stage.stop() + + if exc_info: + trace = traceback.format_exception(*exc_info) + for line in trace: + sys.stderr.write("(ERROR) %s" % line) + + exit(exit_code) + + +if __name__ == '__main__': + main(sys.argv[1:]) From 2f38b2b02aff1c6acf4c6765b964e888618a3c92 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Wed, 19 Aug 2020 14:33:01 +0300 Subject: [PATCH 10/16] Add message type check. --- Utils/Dataflow/data4es/batch_stage/stage.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py index 4be27f104..2d8221b06 100755 --- a/Utils/Dataflow/data4es/batch_stage/stage.py +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -30,6 +30,9 @@ def process(stage, message): """ Single message processing. """ data = message.content() + if not isinstance(data, dict): + stage.log("Cannot process non-dict data: %s." % data, logLevel.WARN) + return False # Processing machinery if 'df' in data and isinstance(data['df'], (str, unicode)): data['df'] = 'processed ' + data['df'] From 7ba3aa57987a61a6a6e9570d49aea59fbb7c7c1d Mon Sep 17 00:00:00 2001 From: Evildoor Date: Wed, 19 Aug 2020 14:52:00 +0300 Subject: [PATCH 11/16] Test stage: make process() support batches. --- Utils/Dataflow/data4es/batch_stage/stage.py | 40 +++++++++++++-------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py index 2d8221b06..210c0157c 100755 --- a/Utils/Dataflow/data4es/batch_stage/stage.py +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -27,20 +27,32 @@ sys.exit(1) -def process(stage, message): - """ Single message processing. """ - data = message.content() - if not isinstance(data, dict): - stage.log("Cannot process non-dict data: %s." % data, logLevel.WARN) - return False - # Processing machinery - if 'df' in data and isinstance(data['df'], (str, unicode)): - data['df'] = 'processed ' + data['df'] - else: - stage.log("Failed to process data %s, required field 'df' not found" - " or contains non-str value." % data, logLevel.WARN) - out_message = JSONMessage(data) - stage.output(out_message) +def process(stage, messages): + """ Single or batch message processing. + + This form of batch processing is pretty pointless in terms of efficiency: + using it will replace, for example, ProcessorStage cycling over 100 + messages with it cycling over 10 batches, and this stage cycling + over 10 messages in each batch. But for testing and illustrative purposes + it will do. + """ + if not isinstance(messages, list): + messages = [messages] + for message in messages: + data = message.content() + if not isinstance(data, dict): + stage.log("Cannot process non-dict data: %s." % data, + logLevel.WARN) + continue + # Processing machinery + if 'df' in data and isinstance(data['df'], (str, unicode)): + data['df'] = 'processed ' + data['df'] + else: + stage.log("Failed to process data %s, required field 'df' not" + " found or contains non-str value." % data, + logLevel.WARN) + out_message = JSONMessage(data) + stage.output(out_message) return True From 4f424e55a1f780bb230e0498cbb15574bed40073 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Wed, 26 Aug 2020 11:02:57 +0300 Subject: [PATCH 12/16] Set BNC to empty string. --- Utils/Dataflow/data4es/batch_stage/stage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py index 210c0157c..96157aa34 100755 --- a/Utils/Dataflow/data4es/batch_stage/stage.py +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -61,6 +61,7 @@ def main(args): stage = ProcessorStage() stage.set_input_message_type(messageType.JSON) stage.set_output_message_type(messageType.JSON) + stage.set_default_arguments(bnc='') stage.process = process From ffaf0645dd0057e747057c26eccfe0284206b576 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Tue, 25 Aug 2020 18:55:01 +0300 Subject: [PATCH 13/16] Add a script comparing normal and batch modes. --- Utils/Dataflow/data4es/batch_stage/inp | 10 ++++++++++ Utils/Dataflow/data4es/batch_stage/run.sh | 15 +++++++++++++++ Utils/Dataflow/data4es/batch_stage/stage.py | 10 ++++++++++ 3 files changed, 35 insertions(+) create mode 100644 Utils/Dataflow/data4es/batch_stage/inp create mode 100755 Utils/Dataflow/data4es/batch_stage/run.sh diff --git a/Utils/Dataflow/data4es/batch_stage/inp b/Utils/Dataflow/data4es/batch_stage/inp new file mode 100644 index 000000000..c67599f38 --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/inp @@ -0,0 +1,10 @@ +{} +{"!@#$": true} +{"df": 1} +{"df": "abc"} +{"df": "abc1"} +{"df": "absdfg"} +{"df": "Some text here."} +{"df": "abc", "fd": true} +{"d": "abc"} +{"df": "abc"} diff --git a/Utils/Dataflow/data4es/batch_stage/run.sh b/Utils/Dataflow/data4es/batch_stage/run.sh new file mode 100755 index 000000000..f5959c4c8 --- /dev/null +++ b/Utils/Dataflow/data4es/batch_stage/run.sh @@ -0,0 +1,15 @@ +source ../../shell_lib/eop_filter + +cmd="./stage.py -m s" +cmd_batch2="./stage.py -b 2 -m s" +cmd_batch100="./stage.py -b 100 -m s" + +# Various tests that should produce the same results. + +# Stage chains. +# these differ by size without eop_filter at the end +cat inp | $cmd | eop_filter | $cmd | eop_filter > outp1 +cat inp | $cmd_batch2 | eop_filter | $cmd_batch2 | eop_filter > outp2 +cat inp | $cmd_batch100 | eop_filter | $cmd_batch100 | eop_filter > outp100 +cat inp | $cmd | eop_filter | $cmd_batch2 | eop_filter > outp12 +cat inp | $cmd_batch2 | eop_filter | $cmd | eop_filter > outp21 diff --git a/Utils/Dataflow/data4es/batch_stage/stage.py b/Utils/Dataflow/data4es/batch_stage/stage.py index 96157aa34..45aca1489 100755 --- a/Utils/Dataflow/data4es/batch_stage/stage.py +++ b/Utils/Dataflow/data4es/batch_stage/stage.py @@ -63,12 +63,22 @@ def main(args): stage.set_output_message_type(messageType.JSON) stage.set_default_arguments(bnc='') + # Accept batch size from command line. + # This is cheating because batch size is supposed to be set by + # stage developer, not received from command line (so, + # from supervisor). However, this is done in this illustrative + # stage to simplify a process of comparing the results of + # normal mode and batch mode with different batch sizes. + stage.add_argument('-b', action='store', type=int, help='Batch size.', + default=1, dest='bsize') + stage.process = process exit_code = 0 exc_info = None try: stage.configure(args) + stage.set_batch_size(stage.ARGS.bsize) stage.run() except (DataflowException, RuntimeError), err: if str(err): From b252d74f8d9732e4f2a1b7d9bb0bdc33da15d70d Mon Sep 17 00:00:00 2001 From: Evildoor Date: Wed, 26 Aug 2020 13:45:40 +0300 Subject: [PATCH 14/16] Don't prepare for input markers in normal mode. --- .../dataflow/communication/stream/InputStream.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py index 50db8bb5e..efbed0257 100644 --- a/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py +++ b/Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py @@ -24,10 +24,14 @@ def configure(self, config={}): """ Configure instance. """ super(InputStream, self).configure(config) self.markers = {} - for name in self.marker_names: - value = config.get(name) - if value: - self.markers[name] = value + # If batch size is 1, meaning non-batch mode will be used, then + # markers are unnecessary (and even can be a hindrance by forcing + # usage of custom_readline() without need). + if config.get('bsize', 1) > 1: + for name in self.marker_names: + value = config.get(name) + if value: + self.markers[name] = value def __iter__(self): """ Initialize iteration. """ From 130d60015d89ba5ecad4c6f5a6ae39e22c864834 Mon Sep 17 00:00:00 2001 From: Evildoor Date: Wed, 26 Aug 2020 13:49:51 +0300 Subject: [PATCH 15/16] Set EOB to be non-empty by default. Default arguments are supposed to make things work as expected - while EOB's value does not really matter now, without a supervisor, it would matter with it. Existing stages are not affected because all markers (which currently consist only of EOB) are ignored by InputStream when batch mode is not used. --- Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py index 70f9fd116..a5b088a91 100644 --- a/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py +++ b/Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py @@ -140,7 +140,7 @@ def defaultArguments(self): ) self.add_argument('--end-of-batch', action='store', type=str, help=u'custom end-of-batch marker\n' - 'DEFAULT: \'\'', + 'DEFAULT: \'EOB\'', default=None, dest='eob' ) @@ -240,7 +240,7 @@ def parse_args(self, args): self.ARGS.bnc = 'BNC' if self.ARGS.eob is None: - self.ARGS.eob = '' + self.ARGS.eob = 'EOB' if self.ARGS.mode == 'm': if 'f' in (self.ARGS.source, self.ARGS.dest): From 7ba11b66b5b527c5b977ad8e7203eb302b068d4f Mon Sep 17 00:00:00 2001 From: Evildoor Date: Tue, 1 Sep 2020 20:07:40 +0300 Subject: [PATCH 16/16] Set EOP to "" to remove need for eop_filter. --- Utils/Dataflow/data4es/batch_stage/run.sh | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/Utils/Dataflow/data4es/batch_stage/run.sh b/Utils/Dataflow/data4es/batch_stage/run.sh index f5959c4c8..cb41340ff 100755 --- a/Utils/Dataflow/data4es/batch_stage/run.sh +++ b/Utils/Dataflow/data4es/batch_stage/run.sh @@ -1,15 +1,15 @@ -source ../../shell_lib/eop_filter - -cmd="./stage.py -m s" -cmd_batch2="./stage.py -b 2 -m s" -cmd_batch100="./stage.py -b 100 -m s" +# $1 has to be used as a workaround to pass empty string as a value for +# -E, since "... -E ''" will treat the single quotes literally due to double +# quotes around them. +cmd="./stage.py -m s -E $1" +cmd_batch2="./stage.py -b 2 -m s -E $1" +cmd_batch100="./stage.py -b 100 -m s -E $1" # Various tests that should produce the same results. # Stage chains. -# these differ by size without eop_filter at the end -cat inp | $cmd | eop_filter | $cmd | eop_filter > outp1 -cat inp | $cmd_batch2 | eop_filter | $cmd_batch2 | eop_filter > outp2 -cat inp | $cmd_batch100 | eop_filter | $cmd_batch100 | eop_filter > outp100 -cat inp | $cmd | eop_filter | $cmd_batch2 | eop_filter > outp12 -cat inp | $cmd_batch2 | eop_filter | $cmd | eop_filter > outp21 +cat inp | $cmd "" | $cmd "" > outp1 +cat inp | $cmd_batch2 "" | $cmd_batch2 "" > outp2 +cat inp | $cmd_batch100 "" | $cmd_batch100 "" > outp100 +cat inp | $cmd "" | $cmd_batch2 "" > outp12 +cat inp | $cmd_batch2 "" | $cmd "" > outp21