Skip to content

Internal communication protocol

Marina Golosova edited this page Oct 23, 2020 · 15 revisions

This page provides specification of the internal communication protocol, developed for data and flow control commands transfer between supervisor and worker of the ETL process' stages.

General terms

  1. The protocol is used for inter-process communication.
  2. The protocol purpose is data transfer and data flow control between two processes: stage supervisor (S) and worker (W).
  3. Connection between S and W is established by S executing W's run instructions (through the W's process standard input (W-STDIN) and output streams (W-STDOUT)).
  4. Protocol elements are:
    • <marker> -- ASCII symbol, never encountered in the transferred data;
    • <message> -- raw data (message content), ending with EOP (end-of-process) marker;
    • <batch> -- group of messages ending with EOB (end-of-batch) marker.
  5. The <message> content format and encode/decode rules are not defined by the protocol and to be coordinated at the application level.
  6. All the <marker>s have default values, which can be altered by S and passed to the W's as a part of its run instruction.
  7. List of <marker>s:
    • EOM (end-of-message):
      • can be sent by: S, W;
      • usage: placed to the stream after the raw data (message content) to indicate the end of the message content;
    • EOP (end-of-process):
      • can be sent by: W;
      • usage: placed to W-STDOUT after the last message of the W's operation execution result (or by its own , if no messages produced) to indicate that requested operation on data is finished and W is ready for the next command;
    • EOB (end-of-batch):
      • can be sent by: S;
      • usage: placed to W-STDIN after last message in a group to indicate end the group, passed to W for batch processing;
    • BNC (batch-not-complete):
      • can be sent by: W;
      • usage: placed to W-STDOUT after EOP or previous BNC to request one more message for batch processing from S;
    • GET (get-new-data):
      • can be sent by: S (E-stage);
      • usage: placed to W-STDIN to trigger W's operation execution.
  8. S and W processes communication scenario depends on the stage's type (E-, T- or L-).

E-stage scenario (plain)

  1. S:
    • execute W's run instruction;
    • until W-STDOUT is open:
      • read <message> from W-STDOUT;
      • pass <message> downstream (to the next stage);
      • repeat;
    • repeat.
  2. W:
    • execute "extraction" operation;
    • generate <messages> for data flow;
    • write <messages> to W-STDOUT (one by one);
    • close W-STDOUT (exit).

E-stage scenario (cyclic)

  1. S:

    • send GET marker to (already running) W (write to W-STDIN) (*)

      OR

      send <message> with last and current (start and end) offset values to (already running) W (write to W-STDIN); (**)

    • until EOP is at W-STDOUT:

      • read <message> from W-STDOUT;
      • pass <message> downstream (to the next stage);
      • repeat;
    • repeat.

  2. W:

    • read GET marker from W-STDIN (*)

      OR

      read <message> with start and end offset values; (**)

    • execute "extraction" operation;

    • generate <messages> for data flow;

    • write <messages> to W-STDOUT (one by one);

    • write EOP to W-STDOUT;

    • repeat.

(*) Offset managed at W's side (e.g. local storage in standalone version).
(**) Offset managed at S'es side.

T- and L-stage scenario (single message)

NOTE: L-stage produces empty transformation result.

  1. S:
    • send <message> to (already running) W (write to W-STDIN);
    • until EOP is at W-STDOUT:
      • read <message> from W-STDOUT;
      • pass <message> downstream (to the next stage);
      • repeat;
    • repeat.
  2. W:
    • read <message> from W-STDIN;
    • process <message> (transform it to one, multiple or zero messages / load to final storage);
    • write transformation result message(s) to W-STDOUT, if any;
    • write EOP to W-STDOUT;
    • repeat.

T- and L-stage scenario (batch processing)

NOTE: L-stage produces empty transformation result.

  1. S:

    • send <message> to (already running) W (write to W-STDIN);

    • while BNC is at W-STDOUT (*)

      OR

      while number of sent messages is less than BATCH_SIZE: (**):

      • IF there are <messages> to be processed:
        • write <message> to W-STDIN;
      • ELSE:
        • write EOB;
      • repeat;
    • IF there are <messages> to be processed:

      • write EOB to W-STDIN; (**)
    • until EOP is at W-STDOUT:

      • read <message> from W-STDOUT;
      • pass <message> downstream (to the next stage);
      • repeat;
    • repeat.

  2. W:

    • until number of read messages is greater then (or equal to) BATCH_SIZE OR EOB is at W-STDIN (*)

      OR

      until EOB is at W-STDIN: (**)

      • read <message> from W-STDIN;
      • write BNC to W-STDOUT; (*)
      • repeat;
    • process all <messages> (transform each to one, multiple or zero messages / load to final storage);

    • write transformation result message(s) to W-STDOUT, if any;

    • write EOP to W-STDOUT;

    • repeat.

(*) Worker-driven batch processing.
(**) Supervisor-driven batch processing.