Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Worker-driven batch processing #399

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Utils/Dataflow/data4es/batch_stage/inp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{}
{"!@#$": true}
{"df": 1}
{"df": "abc"}
{"df": "abc1"}
{"df": "absdfg"}
{"df": "Some text here."}
{"df": "abc", "fd": true}
{"d": "abc"}
{"df": "abc"}
15 changes: 15 additions & 0 deletions Utils/Dataflow/data4es/batch_stage/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# $1 has to be used as a workaround to pass empty string as a value for
# -E, since "... -E ''" will treat the single quotes literally due to double
# quotes around them.
cmd="./stage.py -m s -E $1"
cmd_batch2="./stage.py -b 2 -m s -E $1"
cmd_batch100="./stage.py -b 100 -m s -E $1"

# Various tests that should produce the same results.

# Stage chains.
cat inp | $cmd "" | $cmd "" > outp1
cat inp | $cmd_batch2 "" | $cmd_batch2 "" > outp2
cat inp | $cmd_batch100 "" | $cmd_batch100 "" > outp100
cat inp | $cmd "" | $cmd_batch2 "" > outp12
cat inp | $cmd_batch2 "" | $cmd "" > outp21
102 changes: 102 additions & 0 deletions Utils/Dataflow/data4es/batch_stage/stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/bin/env python
"""
DKB Dataflow stage XXX (StageName).

Stage short description

Authors:
Author Name ([email protected])
"""

import os
import sys
import traceback

try:
base_dir = os.path.dirname(__file__)
dkb_dir = os.path.join(base_dir, os.pardir)
sys.path.append(dkb_dir)
import pyDKB
from pyDKB.dataflow.stage import ProcessorStage
from pyDKB.dataflow.communication.messages import JSONMessage
from pyDKB.dataflow.exceptions import DataflowException
from pyDKB.dataflow import messageType
from pyDKB.common.types import logLevel
except Exception, err:
sys.stderr.write("(ERROR) Failed to import pyDKB library: %s\n" % err)
sys.exit(1)


def process(stage, messages):
""" Single or batch message processing.

This form of batch processing is pretty pointless in terms of efficiency:
using it will replace, for example, ProcessorStage cycling over 100
messages with it cycling over 10 batches, and this stage cycling
over 10 messages in each batch. But for testing and illustrative purposes
it will do.
"""
if not isinstance(messages, list):
messages = [messages]
for message in messages:
data = message.content()
if not isinstance(data, dict):
stage.log("Cannot process non-dict data: %s." % data,
logLevel.WARN)
continue
# Processing machinery
if 'df' in data and isinstance(data['df'], (str, unicode)):
data['df'] = 'processed ' + data['df']
else:
stage.log("Failed to process data %s, required field 'df' not"
" found or contains non-str value." % data,
logLevel.WARN)
out_message = JSONMessage(data)
stage.output(out_message)
return True


def main(args):
""" Program body. """
stage = ProcessorStage()
stage.set_input_message_type(messageType.JSON)
stage.set_output_message_type(messageType.JSON)
stage.set_default_arguments(bnc='')

# Accept batch size from command line.
# This is cheating because batch size is supposed to be set by
# stage developer, not received from command line (so,
# from supervisor). However, this is done in this illustrative
# stage to simplify a process of comparing the results of
# normal mode and batch mode with different batch sizes.
stage.add_argument('-b', action='store', type=int, help='Batch size.',
default=1, dest='bsize')

stage.process = process

exit_code = 0
exc_info = None
try:
stage.configure(args)
stage.set_batch_size(stage.ARGS.bsize)
stage.run()
except (DataflowException, RuntimeError), err:
if str(err):
sys.stderr.write("(ERROR) %s\n" % err)
exit_code = 2
except Exception:
exc_info = sys.exc_info()
exit_code = 3
finally:
stage.stop()

if exc_info:
trace = traceback.format_exception(*exc_info)
for line in trace:
sys.stderr.write("(ERROR) %s" % line)

exit(exit_code)


if __name__ == '__main__':
main(sys.argv[1:])
41 changes: 39 additions & 2 deletions Utils/Dataflow/pyDKB/common/custom_readline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@
import fcntl


def custom_readline(f, newline):
def custom_readline(f, newline, markers={}):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment to the whole set of changes in this function.


Expecting markers only at specific place in the flow is not very safe.

If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>

Here's some illustration of how it looks like:

~$ cat test_readline.py
#!/usr/bin/env python

import sys

from pyDKB.common import custom_readline

EOM='\n'
EOB='\0'

for d in custom_readline(sys.stdin, EOM, {'eob': EOB}):
    print 'd: %r' % d
~$ data='{"proper": "message"}\n{"broken_message":\0'
~$ echo -ne $data
{"proper": "message"}
{"broken_message":~$
~$ echo -en $data | hexdump
0000000 227b 7270 706f 7265 3a22 2220 656d 7373 
0000010 6761 2265 0a7d 227b 7262 6b6f 6e65 6d5f
0000020 7365 6173 6567 3a22 0000
0000029
~$ echo -en $data | ./test_readline.py
d: '{"proper": "message"}\n'
d: '{"broken_message":\x00'
~$ { echo -en $data; sleep 10; echo -en $data; } | ./test_readline.py
d: '{"proper": "message"}\n'
d: '{"broken_message":\x00{"proper": "message"}\n'
d: '{"broken_message":\x00'

What's going on here:

  • if the stream was closed after misplaced <EOB>, custom_readline() will yield a piece of broken data with <EOB>. No one will know it was <EOB> -- but it's fine; data are unreadable anyway, stream is closed -- so whatever was read before, it will be processed;
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored. And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

Returning "name of marker" instead of the marker itself may be ambiguous.

Whenever the function is called from, the context is aware of marker values and can compare returned value to them. On the other hand, sequence of eob<EOF> (with proper EOF == "STDIN is closed"), that originally would be processed as "unexpected end of stream" by the InputStream, now will be taken as <EOB> at the end of stream:

~$ data1='eob'
~$ data2='\0'
~$ echo -ne $data1 | ./test_readline.py
d: 'eob'
~$ echo -ne $data2 | ./test_readline.py
d: 'eob'
d: ''

(BTW, the extra empty line is an unexpected result)

Maybe it's not a big problem -- <EOB> at the end of stream won't make worker to do something unexpected -- but it is just an example of possibility of incorrect interpretation of a plain string 'eob' as a control marker.

Copy link
Contributor Author

@Evildoor Evildoor Sep 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment to the whole set of changes in this function.

Expecting markers only at specific place in the flow is not very safe.

If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>

Here's some illustration of how it looks like:

...

What's going on here:

  • if the stream was closed after misplaced <EOB>, custom_readline() will yield a piece of broken data with <EOB>. No one will know it was <EOB> -- but it's fine; data are unreadable anyway, stream is closed -- so whatever was read before, it will be processed;
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored.

This is true, but the problem here is missed <EOM>, not <EOB>. Even if we work in master that knows nothing about other markers and batch processing, a message being corrupted in a way that erases <EOM> will make it "stick" to the next message, and that next message would be ignored, even though it is correct.

And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

I presume that there is a typo and the second marker is supposed to be <EOM> as well, because I don't see any way in which missed <EOB> can cause custom_readline to hang until next <EOB> is encountered. In such case, it's the continuation of the previous text and my answer above still applies. Otherwise - please, clarify this moment.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

This solves a problem of missing a marker due to it being caught between corrupted messages, but creates another one - what if a sequence of characters inside a (normal or corrupted) message matches some marker's value by accident?


All of this is a part of a bigger question "how to deal with errors in communication between worker and supervisor" - or, to be more precise, communication of markers (and losing a marker can be much more dangerous than losing a message). This problem is not unique to batch mode. For example, if supervisor sends a message but <EOM> gets lost/corrupted then we have a deadlock: supervisor expects <EOP> and worker expects <EOM> [1]. Which means that supervisor should have means of breaking such deadlocks, by checking worker's state or something else.

All things considered, I'm not sure that looking for markers inside messages is a solution and that this question fits into this PR. By the way, code in this PR should actually deal with missed <EOB> correctly (given that supervisor continues to operate correctly after sending mangled <EOB>):

Normal work:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

Error:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages but error occurs: something which may or may not include <EOB> or its pieces somewhere
  • Worker: I got something. It's not a known marker, so it's a message. It's a faulty message, so I discard it. My batch is still too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

[1] This example can be simulated by starting a stage in terminal mode with standard <EOM>=\n and forgetting to press ENTER after a message. One can look at the screen and realize their mistake, but hey, that's some advanced supervisor technique...

Returning "name of marker" instead of the marker itself may be ambiguous.

Whenever the function is called from, the context is aware of marker values and can compare returned value to them. On the other hand, sequence of eob<EOF> (with proper EOF == "STDIN is closed"), that originally would be processed as "unexpected end of stream" by the InputStream, now will be taken as <EOB> at the end of stream:

~$ data1='eob'
~$ data2='\0'
~$ echo -ne $data1 | ./test_readline.py
d: 'eob'
~$ echo -ne $data2 | ./test_readline.py
d: 'eob'
d: ''

(BTW, the extra empty line is an unexpected result)

Maybe it's not a big problem -- <EOB> at the end of stream won't make worker to do something unexpected -- but it is just an example of possibility of incorrect interpretation of a plain string 'eob' as a control marker.

Fair point, will do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Evildoor

Expecting markers only at specific place in the flow is not very safe.
If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored.

This is true, but the problem here is missed <EOM>, not <EOB>.

The problem is not in "missed <EOM>" -- some issues in the data transfer may happen, and it is not what we can control with the protocol specification and encoder/decoder implementation. My point is that in the current implementation of parser (how the presence of <EOB> is checked) makes this kind of issue more harmful than it could be.

Even if we work in master that knows nothing about other markers and batch processing, a message being corrupted in a way that erases <EOM> will make it "stick" to the next message, and that next message would be ignored, even though it is correct.

Right, the correct message following the broken one will be thrown away. But in this case:

  • the flow looks this way: <data_broken+data_correct><EOM>: there's no way to separate broken data from correct (while with <EOB> between them it is possible);
  • the stage does process all the messages prior to the broken one and stuck waiting for the next <EOM> with only the broken one in memory, not the whole batch; in other words -- everything that can be processed, is processed without delay.

And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

I presume that there is a typo and the second marker is supposed to be <EOM> as well,

No, it's <EOB>.

because I don't see any way in which missed <EOB> can cause custom_readline to hang until next <EOB> is encountered.

Not the custom_readline() in particular, but the stage itself; the buffer of messages, for now, is a stage attribute, not the custom_readline()'s.

In such case, it's the continuation of the previous text and my answer above still applies. Otherwise - please, clarify this moment.

Here you are: the stage operating in the batch mode (even in worker-driven scenario) will not process anything until it:

  • gets enough messages in buffer;
  • gets <EOB>;
  • gets input stream closed.

If supervisor sent <EOB>, keeping the stream open but having no more messages for processing for the next hour, the worker will sit and wait for the whole hour instead of processing the messages it already has.
Just in case, reminder: we're talking about situation with the broken message (without <EOM>) prior to the <EOB>, not the general situation.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

This solves a problem of missing a marker due to it being caught between corrupted messages, but creates another one - what if a sequence of characters inside a (normal or corrupted) message matches some marker's value by accident?

Ahha, why do you think <EOM> is a NULL ASCII character, not plain "eom"? ;)
It's a well known issue: everything that has special meaning must be properly treated in the data (just as registered keyword "table" in SQL language, for example).
The simpliest way to get rid of this is to encode data with base64 and make sure none of these 64 ASCII symbols are used as special ones.

All of this is a part of a bigger question "how to deal with errors in communication between worker and supervisor" - or, to be more precise, communication of markers (and losing a marker can be much more dangerous than losing a message). This problem is not unique to batch mode. For example, if supervisor sends a message but <EOM> gets lost/corrupted then we have a deadlock: supervisor expects <EOP> and worker expects <EOM> [1]. Which means that supervisor should have means of breaking such deadlocks, by checking worker's state or something else.

Right, this should be improved. Feel free to write it down somewhere (Google docs with the protocol description, Trello cards, your own notepad), add your ideas on how it can be solved to make sure you won't forget when decide to take care of it.

But here and now your main objective is not to detect and solve existing issues; it is to introduce new functionality without inviting new ones where it can be helped. Existing issue is not an excuse to add new ones saying "why bother now, if it's not working anyway; one day someone will take care of this, so let him/her take care of all at once".

All things considered, I'm not sure that looking for markers inside messages is a solution and that this question fits into this PR.

I do insist.

By the way, code in this PR should actually deal with missed <EOB> correctly (given that supervisor continues to operate correctly after sending mangled <EOB>):

Normal work:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

Error:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages but error occurs: something which may or may not include <EOB> or its pieces somewhere
  • Worker: I got something. It's not a known marker, so it's a message.

Wrong: until there's <EOM>, it's not a message either. So I will wait for <EOM> (for how long? a minute? an hour?) until decide that...

It's a faulty message, so I discard it.
My batch is still too small to process: <BNC>

  • Supervisor: has no more messages: <EOB>

Wrong: if worker saw <EOM> finally came, it's very likely that supervisor has a new pack of messages, in which case worker will have some messages and maybe even fill the batch, or -- as you suggested -- will see another <EOB>.

  • Worker: processes batch: messages + EOP
  • End

[1] This example can be simulated by starting a stage in terminal mode with standard <EOM>=\n and forgetting to press ENTER after a message. One can look at the screen and realize their mistake, but hey, that's some advanced supervisor technique...

I don't think we should continue the discussion for it already took too much words, so here's my ultimate resolution on this question: I won't accept the PR until the code can recognize <EOB> in data stream independently of the marker's position.


Returning "name of marker" instead of the marker itself may be ambiguous.

Fair point, will do.

Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, will do.

""" Read lines with custom line separator.

Construct generator with readline-like functionality:
with every call of ``next()`` method it will read data from ``f``
untill the ``newline`` separator is found; then yields what was read.
until the ``newline`` separator is found; then yields what was read.
If ``markers`` are supplied, then check for their presence: markers
are special strings that can occur:
- At the beginning of ``f``.
- Immediately after a ``newline``.
- Immediately after another marker.
If a marker's value is found in such place, its name is yielded instead
of another chunk of text and the value is removed. Markers in other
places are ignored.

.. warning:: the last line can be incomplete, if the input data flow
is interrupted in the middle of data writing.
Expand All @@ -28,6 +36,8 @@ def custom_readline(f, newline):
:type f: file
:param newline: delimeter to be used instead of ``\\n``
:type newline: str
:param markers: markers to look for, {name:value}
:type markers: dict

:return: iterable object
:rtype: generator
Expand All @@ -52,6 +62,19 @@ def custom_readline(f, newline):
chunk = f.read()
if not chunk:
if buf:
if markers:
# Look for markers after last newline.
look_for_markers = True
while look_for_markers:
look_for_markers = False
for name, value in markers.iteritems():
if buf.startswith(value):
buf = buf[len(value):]
look_for_markers = True
while send_not_next:
send_not_next = yield True
send_not_next = yield name
break
while send_not_next:
# If we are here, the source is not empty for sure:
# we have another message to yield
Expand All @@ -64,6 +87,20 @@ def custom_readline(f, newline):
# and (in theory) may provide another message sooner or later
send_not_next = yield True
while newline in buf:
if markers:
# Look for markers before each yielded "line".
# This includes start of f.
look_for_markers = True
while look_for_markers:
look_for_markers = False
for name, value in markers.iteritems():
if buf.startswith(value):
buf = buf[len(value):]
look_for_markers = True
while send_not_next:
send_not_next = yield True
send_not_next = yield name
break
pos = buf.index(newline) + len(newline)
while send_not_next:
# If we are here, the source is not empty for sure:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ def get_source_info(self):
raise NotImplementedError

def get_message(self):
""" Get new message from current source.
""" Get new message or marker from current source.

Return values:
Message object
marker (str)
False (failed to parse message)
None (all input sources are empty)
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def eop(self):
""" Write EOP marker to the current dest. """
self.get_stream().eop()

def bnc(self):
""" Write BNC marker to the current dest. """
self.get_stream().bnc()

def flush(self):
""" Flush buffered messages to the current dest. """
self.get_stream().flush()
Expand Down
36 changes: 30 additions & 6 deletions Utils/Dataflow/pyDKB/dataflow/communication/stream/InputStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ class InputStream(Stream):

__iterator = None

# Names of markers that the stream knows how to process.
# Values are taken from config, markers with empty values are ignored.
marker_names = ['eob']

def configure(self, config={}):
""" Configure instance. """
super(InputStream, self).configure(config)
self.markers = {}
# If batch size is 1, meaning non-batch mode will be used, then
# markers are unnecessary (and even can be a hindrance by forcing
# usage of custom_readline() without need).
if config.get('bsize', 1) > 1:
Evildoor marked this conversation as resolved.
Show resolved Hide resolved
for name in self.marker_names:
value = config.get(name)
if value:
self.markers[name] = value

def __iter__(self):
""" Initialize iteration. """
self._reset_iterator()
Expand All @@ -24,14 +41,14 @@ def __iter__(self):
def _reset_iterator(self):
""" Reset inner iterator on a new file descriptor. """
fd = self.get_fd()
if self.EOM == '\n':
if self.EOM == '\n' and not self.markers:
self.__iterator = iter(fd.readline, "")
self.is_readable = self._fd_is_readable
elif self.EOM == '':
elif self.EOM == '' and not self.markers:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition should be left as it was:

  • it will be very weird if <EOM> was an empty line, but some other markers were expected at the input: empty <EOM> is supposed to indicate "the whole input is a single message";
  • passing empty line as <EOM> to custom_readline() leads to an endless loop producing empty lines.

But to avoid "silent disregard" for user configuration, in case of empty <EOM> stage must say something about "all other markers at the input will be ignored" (e.g. here).

self.__iterator = iter(fd.read, "")
self.is_readable = self._fd_is_readable
else:
self.__iterator = custom_readline(fd, self.EOM)
self.__iterator = custom_readline(fd, self.EOM, self.markers)
self.is_readable = self._gi_is_readable

def reset(self, fd, close=True, force=False):
Expand Down Expand Up @@ -139,9 +156,10 @@ def parse_message(self, message):
return False

def get_message(self):
""" Get next message from the input stream.
""" Get next message or marker from the input stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds wrong: get_message by it's name should only return next message, not some control markers.

Since all decent return values (like None and False) are already reserved, I would suggest (situationally, with no respect to the bigger picture) to throw some exception (some kind of SoftStopIteration)...

(And, I guess, all these tricks with None and False are better to be replaced with some more readable exceptions as well.. but not in this PR, of course).


Of course, the method can be renamed, but for now I am not sure if it should be.


:returns: parsed next message,
next marker,
False -- parsing failed,
None -- no messages left
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage,
Expand All @@ -154,16 +172,22 @@ def get_message(self):
return result

def next(self):
""" Get next message from the input stream.
""" Get next message or marker from the input stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes InputStream's behavior: previously, iteration over the stream returned next piece of properly parsed data. Now it only returns "next object" from the input stream. And this object may be a "properly parsed piece of data", wrapped into a strictly defined structure of *Message object -- or a string.

Strings are very.. questionable. Too standard -- so it can be anything.

If iteration over InputStream is to return "various objects from the input", these objects should be wrapped into something unmistakable, so that the external context could know for sure: returned object is not just "some string", it is the EOBMarker. It is not necessary objects of some class, it can be constants moved from communication.* classes directly to the communication module, like communication.EOBMarker, communication.EOMMarker, communication.EOPMarker, etc. Still "strings", but at least they have very precise reference, that will save at least from typo in code.
(And, it will make the whole communication module to have unified set of markers; maybe it's not that bad, or maybe it is (Stage 019 is a working example of a stage that would be happy to have different set of markers for input and output streams; but it is because now stages have same markers at stage 1 output and stage 2 input; if set of markers is between supervisor and worker, not worker and worker -- it will be fine). But this question is far beyond this PR's scope...

But I would rather have iteration over InputStream (and Consumer) that returns pieces of data (messages or batches of messages), leaving all the control symbols stuff hidden. Maybe use a callback function to provide InputStream (or Consumer?) with mechanism for <BNC> emitting?

The idea is as follows:

  • next() method needs to know:
    • max number of messages to return (desired batch_size);
    • control symbol (<EOB>) at which reading (and requesting) for new messages should stop, even when there's less than batch_size messages were read;
    • how to request new message (the callback, mentioned above) when one of them is read from the input (or when there's nothing in the input, but it was not closed yet and no <EOB> was found);
  • at each call next() method:
    • returns single message (if no batch_size and/or callback function were configured);
    • returns list of messages of batch_size size;
    • returns list of messages of a smaller size -- if <EOB> was found in the input stream or it was closed;
    • raises StopIteration if all the input stream was already processed and it is closed (via self.__iterator.next()).

If all this stuff is configured or not can be encapsulated into choice of a proper Stream class: say, InputStream produces only single messages and knows nothing about batch-related configuration (and fails if <EOB> appears in the input stream), while some BatchInputStream produces lists of messages and is well aware of batch-related configuration. Which stream to use is defined by the stage's initial configuration: if it supports batch processing, them BatchInputStream is to be used; and even if batch_size will be set to 1, it still knows how to handle <EOB> -- even if it is not supposed to appear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings are very.. questionable. Too standard -- so it can be anything.

If iteration over InputStream is to return "various objects from the input", these objects should be wrapped into something unmistakable, so that the external context could know for sure: returned object is not just "some string", it is the EOBMarker. It is not necessary objects of some class, it can be constants moved from communication.* classes directly to the communication module, like communication.EOBMarker, communication.EOMMarker, communication.EOPMarker, etc. Still "strings", but at least they have very precise reference, that will save at least from typo in code.

I agree with this. If we will decide to keep this logic, then changing strings to constants sounds reasonable.

Everything else here is ideological and will be discussed in email/meeting/etc.


:returns: parsed next message,
next marker,
False -- parsing failed or unexpected end of stream occurred
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage, bool
:rtype: pyDKB.dataflow.communication.messages.AbstractMessage,
str, bool
"""
if not self.__iterator:
self._reset_iterator()
msg = self.__iterator.next()
if not msg.endswith(self.EOM):
# Check whether an expected marker was received.
for key in self.markers:
if msg == key:
return key
log_msg = msg[:10] + '<...>' * (len(msg) > 20)
log_msg += msg[-min(len(msg) - 10, 10):]
log_msg = log_msg.replace('\n', r'\n')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def configure(self, config={}):
""" Configure instance. """
super(OutputStream, self).configure(config)
self.EOP = config.get('eop', '')
self.BNC = config.get('bnc', '')

def write(self, message):
""" Add message to the buffer. """
Expand All @@ -40,6 +41,10 @@ def eop(self):
""" Signalize Supervisor about end of process. """
self.get_fd().write(self.EOP)

def bnc(self):
""" Signalize Supervisor about batch being incomplete. """
self.get_fd().write(self.BNC)

def drop(self):
""" Drop buffer without sending messages anywhere. """
self.msg_buffer = []
18 changes: 18 additions & 0 deletions Utils/Dataflow/pyDKB/dataflow/stage/AbstractStage.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ def defaultArguments(self):
default=None,
dest='eop'
)
self.add_argument('--batch-not-complete', action='store', type=str,
help=u'custom batch-not-complete marker\n'
'DEFAULT: \'BNC\'',
default=None,
dest='bnc'
)
self.add_argument('--end-of-batch', action='store', type=str,
help=u'custom end-of-batch marker\n'
'DEFAULT: \'EOB\'',
default=None,
dest='eob'
)

def _is_flag_option(self, **kwargs):
""" Check if added argument is a flag option. """
Expand Down Expand Up @@ -224,6 +236,12 @@ def parse_args(self, args):
"Case: %s" % (err), logLevel.ERROR)
sys.exit(1)

if self.ARGS.bnc is None:
self.ARGS.bnc = 'BNC'

if self.ARGS.eob is None:
self.ARGS.eob = 'EOB'

if self.ARGS.mode == 'm':
if 'f' in (self.ARGS.source, self.ARGS.dest):
self.log("File source/destination is not allowed "
Expand Down
Loading