Skip to content

🚧 Protoflow implements flow-based programming (FBP) for Rust using Protocol Buffers messages.

License

Notifications You must be signed in to change notification settings

asimov-platform/protoflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Protoflow

License Compatibility Package Documentation

"Ξ€α½° πάντα αΏ₯Ξ΅αΏ– ΞΊΞ±α½Ά οὐδὲν μένΡι" β€” Heraclitus

Protoflow is a Rust implementation of flow-based programming (FBP), with messages encoded as Protocol Buffers. It can be used to implement dataflow systems consisting of interconnected blocks that process messages.

Tip

🚧 We are building in public. This is presently under heavy construction.

✨ Features

  • Implements a flow-based programming (FBP) dataflow scheduler.
  • Constructs systems by connecting reusable components called blocks.
  • Uses Protocol Buffers messages for inter-block communication.
  • Currently offers a threaded runtime with an in-process transport.
  • Planned support for pluggable runtimes (threaded, async, etc).
  • Planned support for pluggable transports (in-process, socket, etc).
  • Includes a command-line interface (CLI) for executing Protoflow blocks.
  • Supports opting out of any feature using comprehensive feature flags.
  • Adheres to the Rust API Guidelines in its naming conventions.
  • 100% free and unencumbered public domain software.

πŸ› οΈ Prerequisites

⬇️ Installation

Installation via Cargo

cargo install protoflow

Installation via Homebrew

brew tap asimov-platform/tap
brew install protoflow --HEAD

πŸ‘‰ Examples

Examples for Rust

For Rust examples, see the examples directory. Good places to start are the echo_lines and count_lines examples:

cargo run --example echo_lines < CHANGES.md
cargo run --example count_lines < README.md

The count_lines example

use protoflow::{blocks::*, BlockResult};

pub fn main() -> BlockResult {
    System::run(|s| {
        let stdin = s.read_stdin();

        let line_decoder = s.decode_lines();
        s.connect(&stdin.output, &line_decoder.input);

        let counter = s.count::<String>();
        s.connect(&line_decoder.output, &counter.input);

        let count_encoder = s.encode_lines();
        s.connect(&counter.count, &count_encoder.input);

        let stdout = s.write_stdout();
        s.connect(&count_encoder.output, &stdout.input);
    })
}

πŸ“š Reference

Glossary

  • System: A collection of blocks that are connected together. Systems are the top-level entities in a Protoflow program.

  • Block: An encapsulated system component that processes messages. Blocks are the autonomous units of computation in a system.

  • Port: A named connection point on a block that sends or receives messages. Ports are the only interfaces through which blocks communicate with each other.

  • Message: A unit of data that flows between blocks in a system. Messages are Protocol Buffers packets that are processed by blocks.

Blocks

The built-in blocks provided by Protoflow are listed below:

Block Description
Buffer Stores all messages it receives.
ConcatStrings Concatenates the received string messages, with an optional delimiter string inserted between each message.
Const Sends a constant value.
Count Counts the number of messages it receives, while optionally passing them through.
Decode Decodes messages from a byte stream.
DecodeCSV Decodes the received input bytes message into a structured CSV format, separating the header and rows as prost_types::Value.
DecodeHex Decodes hexadecimal stream to byte stream.
DecodeJSON Decodes JSON messages from a byte stream.
Delay Passes messages through while delaying them by a fixed or random duration.
Drop Discards all messages it receives.
Encode Encodes messages to a byte stream.
EncodeCSV Encodes the provided header and rows, given as prost_types::Value, into a CSV-formatted byte stream.
EncodeHex Encodes a byte stream into hexadecimal form.
EncodeJSON Encodes messages into JSON format.
Hash Computes the cryptographic hash of a byte stream.
Random Generates and sends a random value.
ReadDir Reads file names from a file system directory.
ReadEnv Reads the value of an environment variable.
ReadFile Reads bytes from the contents of a file.
ReadSocket Reads bytes from a TCP socket.
ReadStdin Reads bytes from standard input (aka stdin).
SplitString Splits the received input message, with an optional delimiter string parameter.
WriteFile Writes or appends bytes to the contents of a file.
WriteSocket Writes bytes to a TCP socket
WriteStderr Writes bytes to standard error (aka stderr).
WriteStdout Writes bytes to standard output (aka stdout).

A block that simply stores all messages it receives.

block-beta
    columns 4
    Source space:2 Buffer
    Source-- "input" -->Buffer

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Buffer block
    class Source hidden
Loading
protoflow execute Buffer

A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message

block-beta
    columns 7
    Source space:2 ConcatStrings space:2 Sink
    Source-- "input" -->ConcatStrings
    ConcatStrings-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ConcatStrings block
    class Source hidden
    class Sink hidden
Loading
protoflow execute ConcatStrings delimiter=","

A block for sending a constant value.

block-beta
    columns 4
    Const space:2 Sink
    Const-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Const block
    class Sink hidden
Loading
protoflow execute Const value=Hello

A block that counts the number of messages it receives, while optionally passing them through.

block-beta
    columns 7
    Source space:2 Count space:2 Sink
    space:7
    space:7
    space:3 Result space:3
    Source-- "input" -->Count
    Count-- "output" -->Sink
    Count-- "count" -->Result

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Count block
    class Source hidden
    class Sink hidden
    class Result hidden
Loading
protoflow execute Count

A block that decodes T messages from a byte stream.

block-beta
    columns 7
    Source space:2 Decode space:2 Sink
    Source-- "input" -->Decode
    Decode-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Decode block
    class Source hidden
    class Sink hidden
Loading
protoflow execute Decode encoding=text

A block that decodes CSV files from a byte stream into a header and rows represented as prost_types::Value

block-beta
    columns 7
    space:5 Sink1 space:1
    space:1 Source space:1 DecodeCSV space:3
    space:5 Sink2 space:1
    Source-- "input" -->DecodeCSV
    DecodeCSV-- "header" -->Sink1
    DecodeCSV-- "content" -->Sink2

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class DecodeCSV block
    class Source hidden
    class Sink1 hidden
    class Sink2 hidden
Loading
protoflow execute DecodeCSV

A block that decodes a hexadecimal byte stream into bytes

block-beta
    columns 7
    Source space:2 DecodeHex space:2 Sink
    Source-- "input" -->DecodeHex
    DecodeHex-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class DecodeHex block
    class Source hidden
    class Sink hidden
Loading
protoflow execute DecodeHex

A block that decodes JSON messages from a byte stream.

block-beta
    columns 7
    Source space:2 DecodeJSON space:2 Sink
    Source-- "input" -->DecodeJSON
    DecodeJSON-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class DecodeJSON block
    class Source hidden
    class Sink hidden
Loading
protoflow execute DecodeJSON

A block that passes messages through while delaying them by a fixed or random duration.

block-beta
    columns 7
    Source space:2 Delay space:2 Sink
    Source-- "input" -->Delay
    Delay-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Delay block
    class Source hidden
    class Sink hidden
Loading
protoflow execute Delay fixed=2

A block that simply discards all messages it receives.

block-beta
    columns 4
    Source space:2 Drop
    Source-- "input" -->Drop

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Drop block
    class Source hidden
Loading
protoflow execute Drop

A block that encodes T messages to a byte stream.

block-beta
    columns 7
    Source space:2 Encode space:2 Sink
    Source-- "input" -->Encode
    Encode-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Encode block
    class Source hidden
    class Sink hidden
Loading
protoflow execute Encode encoding=text
protoflow execute Encode encoding=protobuf

A block that encodes CSV files by converting a header and rows, provided as prost_types::Value streams, into a byte stream

block-beta
    columns 7
    space:1 Source1 space:5
    space:3 EncodeCSV space:1 Sink space:1
    space:1 Source2 space:5
    Source1-- "header" -->EncodeCSV
    Source2-- "rows" -->EncodeCSV
    EncodeCSV-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class EncodeCSV block
    class Source1 hidden
    class Source2 hidden
    class Sink hidden
Loading
protoflow execute EncodeCSV

A block that encodes a byte stream into hexadecimal form.

block-beta
    columns 7
    Source space:2 EncodeHex space:2 Sink
    Source-- "input" -->EncodeHex
    EncodeHex-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class EncodeHex block
    class Source hidden
    class Sink hidden
Loading
protoflow execute EncodeHex

A block that encodes messages into JSON format.

block-beta
    columns 7
    Source space:2 EncodeJSON space:2 Sink
    Source-- "input" -->EncodeJSON
    EncodeJSON-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class EncodeJSON block
    class Source hidden
    class Sink hidden
Loading
protoflow execute EncodeJSON

A block that computes the cryptographic hash of a byte stream, while optionally passing it through.

block-beta
    columns 7
    Source space:2 Hash space:2 Sink
    space:7
    space:7
    space:3 Result space:3
    Source-- "input" -->Hash
    Hash-- "output" -->Sink
    Hash-- "hash" -->Result

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Hash block
    class Source hidden
    class Sink hidden
    class Result hidden
Loading
protoflow execute Hash algorithm=blake3

A block for generating and sending a random value.

block-beta
    columns 4
    Random space:2 Sink
    Random-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class Random block
    class Sink hidden
Loading
protoflow execute Random seed=42

A block that reads file names from a file system directory.

block-beta
    columns 4
    Config space:3
    space:4
    space:4
    ReadDir space:2 Sink
    Config-- "path" -->ReadDir
    ReadDir-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ReadDir block
    class Config hidden
    class Sink hidden
Loading
protoflow execute ReadDir path=/tmp

A block that reads the value of an environment variable.

block-beta
    columns 4
    Config space:3
    space:4
    space:4
    ReadEnv space:2 Sink
    Config-- "name" -->ReadEnv
    ReadEnv-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ReadEnv block
    class Config hidden
    class Sink hidden
Loading
protoflow execute ReadEnv name=TERM

A block that reads bytes from the contents of a file.

block-beta
    columns 4
    Config space:3
    space:4
    space:4
    ReadFile space:2 Sink
    Config-- "path" -->ReadFile
    ReadFile-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ReadFile block
    class Config hidden
    class Sink hidden
Loading
protoflow execute ReadFile path=/tmp/file.txt

A block that reads bytes from a TCP socket.

block-beta
    columns 4
    ReadSocket space:2 Sink
    ReadSocket-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ReadSocket block
    class Sink hidden
Loading
protoflow execute ReadSocket connection=tcp://127.0.0.1:7077 buffer_size=1024

A block that reads bytes from standard input (aka stdin).

block-beta
    columns 4
    ReadStdin space:2 Sink
    ReadStdin-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class ReadStdin block
    class Sink hidden
Loading
protoflow execute ReadStdin < input.txt

A block that splits the received input message, with an optional delimiter string parameter

block-beta
    columns 7
    Source space:2 SplitString space:2 Sink
    Source-- "input" -->SplitString
    SplitString-- "output" -->Sink

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class SplitString block
    class Source hidden
    class Sink hidden
Loading
protoflow execute SplitString delimiter=","

A block that writes or appends bytes to the contents of a file.

block-beta
    columns 4
    space:3 Config
    space:4
    space:4
    Source space:2 WriteFile
    Config-- "path" -->WriteFile
    Source-- "input" -->WriteFile

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class WriteFile block
    class Config hidden
    class Source hidden
Loading
protoflow execute WriteFile path=/tmp/file.txt

A block that writes bytes to TCP socket.

block-beta
    columns 4
    Source space:2 WriteSocket
    Source-- "input" -->WriteSocket

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class WriteSocket block
    class Source hidden
Loading
protoflow execute WriteSocket connection=tcp://127.0.0.1:7077 buffer_size=1024

A block that writes bytes to standard error (aka stderr).

block-beta
    columns 4
    Source space:2 WriteStderr
    Source-- "input" -->WriteStderr

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class WriteStderr block
    class Source hidden
Loading
protoflow execute WriteStderr < input.txt 2> output.txt

A block that writes bytes to standard output (aka stdout).

block-beta
    columns 4
    Source space:2 WriteStdout
    Source-- "input" -->WriteStdout

    classDef block height:48px,padding:8px;
    classDef hidden visibility:none;
    class WriteStdout block
    class Source hidden
Loading
protoflow execute WriteStdout < input.txt > output.txt

πŸ‘¨β€πŸ’» Development

git clone https://github.com/asimov-platform/protoflow.git

Guidelines

Contributing a pull request

  • Do your best to adhere to the existing coding conventions and idioms.
  • Make sure to run cargo fmt prior to submitting your pull request.
  • Don't leave trailing whitespace on any line, and make sure all text files include a terminating newline character.

Adding a new block type

To add a new block type implementation, make sure to examine and amend:

Note

If a block implementation requires additional crate dependencies, it may be appropriate for that block availability to be featured-gated so as to enable developers to opt out of those dependencies.

Block implementation notes

  • Blocks must not panic; use other error-handling strategies. Ideally, block implementations should be robust and infallible. When that's not possible, consider encoding errors by having the output message type be an enum (cf. Rust's Result) or consider having a dedicated error output port. If truly necessary, abort block execution by returning a BlockError.
  • Blocks should not generally spawn threads.
  • Blocks should document their system resource requirements, if any.
  • Blocks should use the tracing crate for logging any errors, warnings, and debug output. However, since tracing is an optional feature and dependency, do make sure to feature-gate any use of tracing behind a #[cfg(feature = "tracing")] guard.

Share on Twitter Share on Reddit Share on Hacker News Share on Facebook