Flow Based Programming inspired framework in Go
Learn more about FBP (originally discovered by @jpaulm)
F-Mesh is a simplistic FBP-inspired framework in Go. It allows you to express your program as a mesh of interconnected components. You can think of it as a simple functions orchestrator.
- F-Mesh consists of multiple Components - the main building blocks
- Components have unlimited number of input and output
Ports
- The main job of each component is to read inputs and provide outputs
- Any output port can be connected to any input port via Pipes
- The component behaviour is defined by its Activation function
- The framework checks when components are ready to be activated and calls their activation functions concurrently
- One such iteration is called Activation cycle
- On each activation cycle the framework does same things: activates all the components ready for activation, flushes the data through pipes and disposes input Signals (the data chunks flowing between components)
- Ports and pipes are type agnostic, any data can be transferred or aggregated on any port
- The framework works in discrete time, not it wall time. The quant of time is 1 activation cycle, which gives you "logical parallelism" out of the box
- F-Mesh is suitable for logical wireframing, simulation, functional-style computations and implementing simple concurrency patterns without using the concurrency primitives like channels or any sort of locks
F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers)
The framework is not suitable for implementing complex concurrent systems
// Create f-mesh
fm := fmesh.New("hello world").
WithComponents(
component.New("concat").
WithInputs("i1", "i2").
WithOutputs("res").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
word1 := inputs.ByName("i1").Signals().FirstPayload().(string)
word2 := inputs.ByName("i2").Signals().FirstPayload().(string)
outputs.ByName("res").PutSignals(signal.New(word1 + word2))
return nil
}),
component.New("case").
WithInputs("i1").
WithOutputs("res").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
inputString := inputs.ByName("i1").Signals().FirstPayload().(string)
outputs.ByName("res").PutSignals(signal.New(strings.ToTitle(inputString)))
return nil
})).
.WithConfig(fmesh.Config{
ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic,
CyclesLimit: 10,
})
fm.Components().ByName("concat").Outputs().ByName("res").PipeTo(
fm.Components().ByName("case").Inputs().ByName("i1"),
)
// Init inputs
fm.Components().ByName("concat").Inputs().ByName("i1").PutSignals(signal.New("hello "))
fm.Components().ByName("concat").Inputs().ByName("i2").PutSignals(signal.New("world !"))
// Run the mesh
_, err := fm.Run()
// Check for errors
if err != nil {
fmt.Println("F-Mesh returned an error")
os.Exit(1)
}
//Extract results
results := fm.Components().ByName("case").Outputs().ByName("res").Signals().FirstPayload()
fmt.Printf("Result is :%v", results)