Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Worker-driven batch processing #399

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open

Conversation

Evildoor
Copy link
Contributor

@Evildoor Evildoor commented Aug 26, 2020

Add worker-driven batch processing.

Note: implementing batch processing does not require adding a dummy stage that was used to test it. Therefore, I leave it here for now if someone wants to test it by themselves, but then I see two options:

First is to cut the code out after PR is approved but before merge.
Second - actually put it into repository as an example. However, in this case it looks better to:

  • Put the stage into skel directory as batch_stage.py or something.
  • Rework run.sh into proper tests instead of "you can run this script and see for yourself".

Doing so also suggests looking at existing skel first, so this seems to be a task to be done outside of this PR.

Either way, the stage's code and commits can be more or less ignored when reviewing (as code to assess and correct, not as a demonstration which can be of use).

Note 2: pyDKB version was not updated yet in case of further discussion that will lead to another rework of this code.

TO DO:

  • Fix independent problems that were pointed out
    • EOP filter is not necessary
    • Return marker value instead of name
    • Empty EOM = the whole input is one message = no markers
    • Check for markers' presence everywhere, not in just specific places
      • Pick special (similar to EOM) default values for EOB and BNC to prevent false positives inside messages
    • Check batch size with >= instead of ==, just to be safe. Consider moving it out of other if-elses, for the same reason.
  • Discuss ideological moments
  • Make changes according to results of the discussion
  • Fix pyDKB tests
  • Implement batch_stage.py and run.sh as pyDKB tests
  • Update pyDKB version

@Evildoor Evildoor self-assigned this Aug 26, 2020
Add the first input marker: End-Of-Batch.
Default arguments are supposed to make things work as expected - while
using empty BNC would do so in nearby future, when there would be batch
processing but no supervisor, it would not be so when the latter would
be present as well. Therefore, it is set to be non-empty by default:
- Stages without batch processing won't be affected because they
  won't be sending BNC.
- Stages with batch processing will set it to be empty in their default
  values.
The idea is to keep run() simple and independent from
batch/not batch/skip/whatever mode.
@Evildoor Evildoor force-pushed the batch-worker-driven branch from 4b824c9 to ffaf064 Compare August 26, 2020 10:12
@Evildoor Evildoor changed the title [WIP] Worker-driven batch processing Worker-driven batch processing Aug 26, 2020
Default arguments are supposed to make things work as expected - while
EOB's value does not really matter now, without a supervisor, it would
matter with it.

Existing stages are not affected because all markers (which currently consist
only of EOB) are ignored by InputStream when batch mode is not used.
@Evildoor Evildoor requested a review from mgolosova August 26, 2020 11:03
Copy link
Collaborator

@mgolosova mgolosova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this PR deserves [WIP] status, for there are some ideological moments I would suggest doing in a different way. So the review is not a review, it is more a discussion.

In general:

  1. Adding a test stage is a good idea...
    ...but please append it to pyDKB tests (/Utils/Dataflow/test/pyDKB). Feel free to ask any questions about these tests structure and design via e-mail.

  2. You must have noticed that previously added tests are failing now [1].
    Whenever this PR is finished, all tests must be passed, so do not ignore it.

  3. Functionally, in some cases this version will lead to unexpected results (see comments below).

  4. Ideologically, this PR introduces batches but does not provide any tools to work with it.
    The only way to read batch (for now) is to use pyDKB.dataflow.stage.ProcessorStage and it's input(); while this functionality, at its core, is more naturally falling to the pyDKB.dataflow.communication submodule context.

Some suggestions on how things can be organized are in the comments below. For now it is just some thoughts -- and I suggest us to go on with (ideological) discussion (maybe this time via e-mail) only when one (or both) of us develop some full-featured proposal.


[1] Hint: if run by hands, pyDKB tests produce files with debug information; for the first test it will be:

[test/pyDKB]$ ./test.sh -c 01
FAIL: 01 (STDERR) (cmd: './json2TTL.py -d s input/NDjson.json')
[test/pyDKB]$ cat 01_*.diff
--- case/01/err 2020-08-14 14:06:33.498802785 +0200
+++ err.tmp     2020-08-27 22:17:15.809338132 +0200
@@ -1,4 +1,5 @@
 (INFO) (ProcessorStage) Configuration parameters:
+(INFO) (ProcessorStage)   eob          : 'EOB'
 (INFO) (ProcessorStage)   hdfs         : 'False'
 (INFO) (ProcessorStage)   dest         : 's'
 (INFO) (ProcessorStage)   input_dir    : '.'
@@ -9,6 +10,7 @@
 (INFO) (ProcessorStage)   source       : 'f'
 (INFO) (ProcessorStage)   eop          : ''
 (INFO) (ProcessorStage)   mode         : 'f'
+(INFO) (ProcessorStage)   bnc          : 'BNC'
 (INFO) (ProcessorStage)   config       : 'None'
 (INFO) (ProcessorStage)   input_files  : '['input/NDjson.json']'
 (INFO) (ProcessorStage) Starting stage execution.

Comment on lines 3 to 5
cmd="./stage.py -m s"
cmd_batch2="./stage.py -b 2 -m s"
cmd_batch100="./stage.py -b 100 -m s"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cmd="./stage.py -m s"
cmd_batch2="./stage.py -b 2 -m s"
cmd_batch100="./stage.py -b 100 -m s"
cmd="./stage.py -m s -E ''"
cmd_batch2="./stage.py -b 2 -m s -E ''"
cmd_batch100="./stage.py -b 100 -m s -E ''"

This way there will be no need in eop_filter in the chains below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. You've already suggested this before, and I tried to do it before making this PR, but it was not working for some reason. Did it again after seeing this comment, and it worked. Weird, I probably made a mistake somewhere during the first attempt...

Side note - simply adding -E '' works in command line, but not here: due to double quotes, EOP will be set to '' instead of an empty line, so a workaround has to be used. And no, this is not the reason of the previously mentioned failure, I did it then as well.

@@ -10,12 +10,20 @@
import fcntl


def custom_readline(f, newline):
def custom_readline(f, newline, markers={}):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment to the whole set of changes in this function.


Expecting markers only at specific place in the flow is not very safe.

If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>

Here's some illustration of how it looks like:

~$ cat test_readline.py
#!/usr/bin/env python

import sys

from pyDKB.common import custom_readline

EOM='\n'
EOB='\0'

for d in custom_readline(sys.stdin, EOM, {'eob': EOB}):
    print 'd: %r' % d
~$ data='{"proper": "message"}\n{"broken_message":\0'
~$ echo -ne $data
{"proper": "message"}
{"broken_message":~$
~$ echo -en $data | hexdump
0000000 227b 7270 706f 7265 3a22 2220 656d 7373 
0000010 6761 2265 0a7d 227b 7262 6b6f 6e65 6d5f
0000020 7365 6173 6567 3a22 0000
0000029
~$ echo -en $data | ./test_readline.py
d: '{"proper": "message"}\n'
d: '{"broken_message":\x00'
~$ { echo -en $data; sleep 10; echo -en $data; } | ./test_readline.py
d: '{"proper": "message"}\n'
d: '{"broken_message":\x00{"proper": "message"}\n'
d: '{"broken_message":\x00'

What's going on here:

  • if the stream was closed after misplaced <EOB>, custom_readline() will yield a piece of broken data with <EOB>. No one will know it was <EOB> -- but it's fine; data are unreadable anyway, stream is closed -- so whatever was read before, it will be processed;
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored. And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

Returning "name of marker" instead of the marker itself may be ambiguous.

Whenever the function is called from, the context is aware of marker values and can compare returned value to them. On the other hand, sequence of eob<EOF> (with proper EOF == "STDIN is closed"), that originally would be processed as "unexpected end of stream" by the InputStream, now will be taken as <EOB> at the end of stream:

~$ data1='eob'
~$ data2='\0'
~$ echo -ne $data1 | ./test_readline.py
d: 'eob'
~$ echo -ne $data2 | ./test_readline.py
d: 'eob'
d: ''

(BTW, the extra empty line is an unexpected result)

Maybe it's not a big problem -- <EOB> at the end of stream won't make worker to do something unexpected -- but it is just an example of possibility of incorrect interpretation of a plain string 'eob' as a control marker.

Copy link
Contributor Author

@Evildoor Evildoor Sep 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment to the whole set of changes in this function.

Expecting markers only at specific place in the flow is not very safe.

If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>

Here's some illustration of how it looks like:

...

What's going on here:

  • if the stream was closed after misplaced <EOB>, custom_readline() will yield a piece of broken data with <EOB>. No one will know it was <EOB> -- but it's fine; data are unreadable anyway, stream is closed -- so whatever was read before, it will be processed;
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored.

This is true, but the problem here is missed <EOM>, not <EOB>. Even if we work in master that knows nothing about other markers and batch processing, a message being corrupted in a way that erases <EOM> will make it "stick" to the next message, and that next message would be ignored, even though it is correct.

And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

I presume that there is a typo and the second marker is supposed to be <EOM> as well, because I don't see any way in which missed <EOB> can cause custom_readline to hang until next <EOB> is encountered. In such case, it's the continuation of the previous text and my answer above still applies. Otherwise - please, clarify this moment.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

This solves a problem of missing a marker due to it being caught between corrupted messages, but creates another one - what if a sequence of characters inside a (normal or corrupted) message matches some marker's value by accident?


All of this is a part of a bigger question "how to deal with errors in communication between worker and supervisor" - or, to be more precise, communication of markers (and losing a marker can be much more dangerous than losing a message). This problem is not unique to batch mode. For example, if supervisor sends a message but <EOM> gets lost/corrupted then we have a deadlock: supervisor expects <EOP> and worker expects <EOM> [1]. Which means that supervisor should have means of breaking such deadlocks, by checking worker's state or something else.

All things considered, I'm not sure that looking for markers inside messages is a solution and that this question fits into this PR. By the way, code in this PR should actually deal with missed <EOB> correctly (given that supervisor continues to operate correctly after sending mangled <EOB>):

Normal work:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

Error:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages but error occurs: something which may or may not include <EOB> or its pieces somewhere
  • Worker: I got something. It's not a known marker, so it's a message. It's a faulty message, so I discard it. My batch is still too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

[1] This example can be simulated by starting a stage in terminal mode with standard <EOM>=\n and forgetting to press ENTER after a message. One can look at the screen and realize their mistake, but hey, that's some advanced supervisor technique...

Returning "name of marker" instead of the marker itself may be ambiguous.

Whenever the function is called from, the context is aware of marker values and can compare returned value to them. On the other hand, sequence of eob<EOF> (with proper EOF == "STDIN is closed"), that originally would be processed as "unexpected end of stream" by the InputStream, now will be taken as <EOB> at the end of stream:

~$ data1='eob'
~$ data2='\0'
~$ echo -ne $data1 | ./test_readline.py
d: 'eob'
~$ echo -ne $data2 | ./test_readline.py
d: 'eob'
d: ''

(BTW, the extra empty line is an unexpected result)

Maybe it's not a big problem -- <EOB> at the end of stream won't make worker to do something unexpected -- but it is just an example of possibility of incorrect interpretation of a plain string 'eob' as a control marker.

Fair point, will do.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Evildoor

Expecting markers only at specific place in the flow is not very safe.
If the input stream was corrupted and part of it is missed, it may look like this:

<data:proper_message><EOM><data:broken_message><EOB>
  • if the stream wasn't closed, custom_readline() won't return anything until it sees <EOM>. But when it does -- the broken message, <EOB> and the next proper message will stick together and will be taken as a single message; it won't be decoded (for it is not a properly formatted message) and will be thrown away. So the proper message will be ignored.

This is true, but the problem here is missed <EOM>, not <EOB>.

The problem is not in "missed <EOM>" -- some issues in the data transfer may happen, and it is not what we can control with the protocol specification and encoder/decoder implementation. My point is that in the current implementation of parser (how the presence of <EOB> is checked) makes this kind of issue more harmful than it could be.

Even if we work in master that knows nothing about other markers and batch processing, a message being corrupted in a way that erases <EOM> will make it "stick" to the next message, and that next message would be ignored, even though it is correct.

Right, the correct message following the broken one will be thrown away. But in this case:

  • the flow looks this way: <data_broken+data_correct><EOM>: there's no way to separate broken data from correct (while with <EOB> between them it is possible);
  • the stage does process all the messages prior to the broken one and stuck waiting for the next <EOM> with only the broken one in memory, not the whole batch; in other words -- everything that can be processed, is processed without delay.

And what's even worse -- until there's<EOM> in the input, custom_readline() will not return anything at all -- and the whole batch of already read messages will sit in memory and wait, not being processed till the next <EOB> or the end of input.

I presume that there is a typo and the second marker is supposed to be <EOM> as well,

No, it's <EOB>.

because I don't see any way in which missed <EOB> can cause custom_readline to hang until next <EOB> is encountered.

Not the custom_readline() in particular, but the stage itself; the buffer of messages, for now, is a stage attribute, not the custom_readline()'s.

In such case, it's the continuation of the previous text and my answer above still applies. Otherwise - please, clarify this moment.

Here you are: the stage operating in the batch mode (even in worker-driven scenario) will not process anything until it:

  • gets enough messages in buffer;
  • gets <EOB>;
  • gets input stream closed.

If supervisor sent <EOB>, keeping the stream open but having no more messages for processing for the next hour, the worker will sit and wait for the whole hour instead of processing the messages it already has.
Just in case, reminder: we're talking about situation with the broken message (without <EOM>) prior to the <EOB>, not the general situation.

What I'd expect in this situation:

  • <EOB> is detected;
  • user (or log file) is informed that input stream looks corrupted (we do not expect any data right before <EOB>);
  • <EOB> is processed in a regular way.

This solves a problem of missing a marker due to it being caught between corrupted messages, but creates another one - what if a sequence of characters inside a (normal or corrupted) message matches some marker's value by accident?

Ahha, why do you think <EOM> is a NULL ASCII character, not plain "eom"? ;)
It's a well known issue: everything that has special meaning must be properly treated in the data (just as registered keyword "table" in SQL language, for example).
The simpliest way to get rid of this is to encode data with base64 and make sure none of these 64 ASCII symbols are used as special ones.

All of this is a part of a bigger question "how to deal with errors in communication between worker and supervisor" - or, to be more precise, communication of markers (and losing a marker can be much more dangerous than losing a message). This problem is not unique to batch mode. For example, if supervisor sends a message but <EOM> gets lost/corrupted then we have a deadlock: supervisor expects <EOP> and worker expects <EOM> [1]. Which means that supervisor should have means of breaking such deadlocks, by checking worker's state or something else.

Right, this should be improved. Feel free to write it down somewhere (Google docs with the protocol description, Trello cards, your own notepad), add your ideas on how it can be solved to make sure you won't forget when decide to take care of it.

But here and now your main objective is not to detect and solve existing issues; it is to introduce new functionality without inviting new ones where it can be helped. Existing issue is not an excuse to add new ones saying "why bother now, if it's not working anyway; one day someone will take care of this, so let him/her take care of all at once".

All things considered, I'm not sure that looking for markers inside messages is a solution and that this question fits into this PR.

I do insist.

By the way, code in this PR should actually deal with missed <EOB> correctly (given that supervisor continues to operate correctly after sending mangled <EOB>):

Normal work:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages: <EOB>
  • Worker: processes batch: messages + EOP
  • End

Error:

  • Supervisor: message
  • Worker: adds message to batch, batch is too small to process: <BNC>
  • Supervisor: has no more messages but error occurs: something which may or may not include <EOB> or its pieces somewhere
  • Worker: I got something. It's not a known marker, so it's a message.

Wrong: until there's <EOM>, it's not a message either. So I will wait for <EOM> (for how long? a minute? an hour?) until decide that...

It's a faulty message, so I discard it.
My batch is still too small to process: <BNC>

  • Supervisor: has no more messages: <EOB>

Wrong: if worker saw <EOM> finally came, it's very likely that supervisor has a new pack of messages, in which case worker will have some messages and maybe even fill the batch, or -- as you suggested -- will see another <EOB>.

  • Worker: processes batch: messages + EOP
  • End

[1] This example can be simulated by starting a stage in terminal mode with standard <EOM>=\n and forgetting to press ENTER after a message. One can look at the screen and realize their mistake, but hey, that's some advanced supervisor technique...

I don't think we should continue the discussion for it already took too much words, so here's my ultimate resolution on this question: I won't accept the PR until the code can recognize <EOB> in data stream independently of the marker's position.


Returning "name of marker" instead of the marker itself may be ambiguous.

Fair point, will do.

Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, will do.

self.__iterator = iter(fd.readline, "")
self.is_readable = self._fd_is_readable
elif self.EOM == '':
elif self.EOM == '' and not self.markers:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition should be left as it was:

  • it will be very weird if <EOM> was an empty line, but some other markers were expected at the input: empty <EOM> is supposed to indicate "the whole input is a single message";
  • passing empty line as <EOM> to custom_readline() leads to an endless loop producing empty lines.

But to avoid "silent disregard" for user configuration, in case of empty <EOM> stage must say something about "all other markers at the input will be ignored" (e.g. here).

@@ -139,9 +156,10 @@ def parse_message(self, message):
return False

def get_message(self):
""" Get next message from the input stream.
""" Get next message or marker from the input stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds wrong: get_message by it's name should only return next message, not some control markers.

Since all decent return values (like None and False) are already reserved, I would suggest (situationally, with no respect to the bigger picture) to throw some exception (some kind of SoftStopIteration)...

(And, I guess, all these tricks with None and False are better to be replaced with some more readable exceptions as well.. but not in this PR, of course).


Of course, the method can be renamed, but for now I am not sure if it should be.

@@ -154,16 +172,22 @@ def get_message(self):
return result

def next(self):
""" Get next message from the input stream.
""" Get next message or marker from the input stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes InputStream's behavior: previously, iteration over the stream returned next piece of properly parsed data. Now it only returns "next object" from the input stream. And this object may be a "properly parsed piece of data", wrapped into a strictly defined structure of *Message object -- or a string.

Strings are very.. questionable. Too standard -- so it can be anything.

If iteration over InputStream is to return "various objects from the input", these objects should be wrapped into something unmistakable, so that the external context could know for sure: returned object is not just "some string", it is the EOBMarker. It is not necessary objects of some class, it can be constants moved from communication.* classes directly to the communication module, like communication.EOBMarker, communication.EOMMarker, communication.EOPMarker, etc. Still "strings", but at least they have very precise reference, that will save at least from typo in code.
(And, it will make the whole communication module to have unified set of markers; maybe it's not that bad, or maybe it is (Stage 019 is a working example of a stage that would be happy to have different set of markers for input and output streams; but it is because now stages have same markers at stage 1 output and stage 2 input; if set of markers is between supervisor and worker, not worker and worker -- it will be fine). But this question is far beyond this PR's scope...

But I would rather have iteration over InputStream (and Consumer) that returns pieces of data (messages or batches of messages), leaving all the control symbols stuff hidden. Maybe use a callback function to provide InputStream (or Consumer?) with mechanism for <BNC> emitting?

The idea is as follows:

  • next() method needs to know:
    • max number of messages to return (desired batch_size);
    • control symbol (<EOB>) at which reading (and requesting) for new messages should stop, even when there's less than batch_size messages were read;
    • how to request new message (the callback, mentioned above) when one of them is read from the input (or when there's nothing in the input, but it was not closed yet and no <EOB> was found);
  • at each call next() method:
    • returns single message (if no batch_size and/or callback function were configured);
    • returns list of messages of batch_size size;
    • returns list of messages of a smaller size -- if <EOB> was found in the input stream or it was closed;
    • raises StopIteration if all the input stream was already processed and it is closed (via self.__iterator.next()).

If all this stuff is configured or not can be encapsulated into choice of a proper Stream class: say, InputStream produces only single messages and knows nothing about batch-related configuration (and fails if <EOB> appears in the input stream), while some BatchInputStream produces lists of messages and is well aware of batch-related configuration. Which stream to use is defined by the stage's initial configuration: if it supports batch processing, them BatchInputStream is to be used; and even if batch_size will be set to 1, it still knows how to handle <EOB> -- even if it is not supposed to appear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings are very.. questionable. Too standard -- so it can be anything.

If iteration over InputStream is to return "various objects from the input", these objects should be wrapped into something unmistakable, so that the external context could know for sure: returned object is not just "some string", it is the EOBMarker. It is not necessary objects of some class, it can be constants moved from communication.* classes directly to the communication module, like communication.EOBMarker, communication.EOMMarker, communication.EOPMarker, etc. Still "strings", but at least they have very precise reference, that will save at least from typo in code.

I agree with this. If we will decide to keep this logic, then changing strings to constants sounds reasonable.

Everything else here is ideological and will be discussed in email/meeting/etc.

@Evildoor Evildoor changed the title Worker-driven batch processing [WIP] Worker-driven batch processing Aug 28, 2020
@Evildoor Evildoor force-pushed the batch-worker-driven branch from 04a2365 to 7ba11b6 Compare September 1, 2020 17:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants