Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V.0.1.0 #57

Merged
merged 3 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,26 @@

<h1>What is it?</h1>
<p>F-Mesh is a simplistic FBP-inspired framework in Go.
It allows you to express your program as a mesh of interconnected components.</p>
It allows you to express your program as a mesh of interconnected components.
You can think of it as a simple functions orchestrator.
</p>
<h3>Main concepts:</h3>
<ul>
<li>F-Mesh consists of multiple <b>Components</b> - the main building blocks</li>
<li>Components have unlimited number of input and output <p>Ports</p>. </li>
<li>Components have unlimited number of input and output <p>Ports</p></li>
<li>The main job of each component is to read inputs and provide outputs</li>
<li>Each port of the component can be connected to one or multiple ports of any other component. Such connections are called <b>Pipes</b></li>
<li>It does not matter whether port is input or output, any port can be connected to any other port</li>
<li>Any output port can be connected to any input port via <b>Pipes</b></li>
<li>The component behaviour is defined by its <b>Activation function</b></li>
<li>The framework checks when components are ready to be activated and calls their activation functions concurrently</li>
<li>One such iteration is called <b>Activation cycle</b></li>
<li>On each activation cycle the framework does same things: activates all the components ready to be activated, flushes the data through pipes and disposes processed <b>Signals (the data chunks flowing between components)</b></li>
<li>On each activation cycle the framework does same things: activates all the components ready for activation, flushes the data through pipes and disposes input <b>Signals (the data chunks flowing between components)</b></li>
<li>Ports and pipes are type agnostic, any data can be transferred or aggregated on any port</li>
<li>The framework works in discrete time, not it wall time. The quant of time is 1 activation cycle, which gives you "logical parallelism" out of the box</li>
<li>F-Mesh is suitable for logical wireframing, simulation, functional-style computations and implementing simple concurrency patterns without using the concurrency primitives like channels or any sort of locks</li>
</ul>

<h1>What it is not?</h1>
<p>F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers).</p>
<p>F-mesh is not a classical FBP implementation, and it is not fully async. It does not support long-running components or wall-time events (like timers and tickers)</p>
<p>The framework is not suitable for implementing complex concurrent systems</p>

<h2>Example:</h2>
Expand Down Expand Up @@ -54,7 +55,10 @@ It allows you to express your program as a mesh of interconnected components.</p
outputs.ByName("res").PutSignals(signal.New(strings.ToTitle(inputString)))
return nil
})).
WithErrorHandlingStrategy(fmesh.StopOnFirstErrorOrPanic)
.WithConfig(fmesh.Config{
ErrorHandlingStrategy: fmesh.StopOnFirstErrorOrPanic,
CyclesLimit: 10,
})

fm.Components().ByName("concat").Outputs().ByName("res").PipeTo(
fm.Components().ByName("case").Inputs().ByName("i1"),
Expand Down
35 changes: 31 additions & 4 deletions component/activation_result.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package component

import (
"errors"
"fmt"
)

Expand Down Expand Up @@ -30,6 +31,12 @@

// ActivationCodePanicked : component is activated, but panicked
ActivationCodePanicked

// ActivationCodeWaitingForInputs : component waits for specific inputs, but all input signals in current activation cycle may be cleared (default behaviour)

Check notice on line 35 in component/activation_result.go

View workflow job for this annotation

GitHub Actions / qodana

Comment of exported element starts with the incorrect name

Comment should have the following format 'ActivationCodeWaitingForInputsClear ...' (with an optional leading article)

Check notice on line 35 in component/activation_result.go

View workflow job for this annotation

GitHub Actions / qodana

Comment of exported element starts with the incorrect name

Comment should have the following format 'ActivationCodeWaitingForInputsClear ...' (with an optional leading article)
ActivationCodeWaitingForInputsClear

// ActivationCodeWaitingForInputsKeep : component waits for specific inputs, but wants to keep current input signals for the next cycle
ActivationCodeWaitingForInputsKeep
)

// NewActivationResult creates a new activation result for given component
Expand Down Expand Up @@ -60,13 +67,13 @@
return ar.code
}

// HasError returns true when activation result has an error
func (ar *ActivationResult) HasError() bool {
// IsError returns true when activation result has an error
func (ar *ActivationResult) IsError() bool {
return ar.code == ActivationCodeReturnedError && ar.Error() != nil
}

// HasPanic returns true when activation result is derived from panic
func (ar *ActivationResult) HasPanic() bool {
// IsPanic returns true when activation result is derived from panic
func (ar *ActivationResult) IsPanic() bool {
return ar.code == ActivationCodePanicked && ar.Error() != nil
}

Expand Down Expand Up @@ -125,3 +132,23 @@
WithActivationCode(ActivationCodePanicked).
WithError(err)
}

func (c *Component) newActivationResultWaitingForInputs(err error) *ActivationResult {
activationCode := ActivationCodeWaitingForInputsClear
if errors.Is(err, errWaitingForInputsKeep) {
activationCode = ActivationCodeWaitingForInputsKeep
}
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(activationCode).
WithError(err)
}

func IsWaitingForInput(activationResult *ActivationResult) bool {
return activationResult.Code() == ActivationCodeWaitingForInputsClear ||
activationResult.Code() == ActivationCodeWaitingForInputsKeep
}

func WantsToKeepInputs(activationResult *ActivationResult) bool {
return activationResult.Code() == ActivationCodeWaitingForInputsKeep
}
4 changes: 2 additions & 2 deletions component/activation_result_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (collection ActivationResultCollection) Add(activationResults ...*Activatio
// HasErrors tells whether the collection contains at least one activation result with error and respective code
func (collection ActivationResultCollection) HasErrors() bool {
for _, ar := range collection {
if ar.HasError() {
if ar.IsError() {
return true
}
}
Expand All @@ -29,7 +29,7 @@ func (collection ActivationResultCollection) HasErrors() bool {
// HasPanics tells whether the collection contains at least one activation result with panic and respective code
func (collection ActivationResultCollection) HasPanics() bool {
for _, ar := range collection {
if ar.HasPanic() {
if ar.IsPanic() {
return true
}
}
Expand Down
8 changes: 4 additions & 4 deletions component/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package component
// Collection is a collection of components with useful methods
type Collection map[string]*Component

// NewComponentCollection creates empty collection
func NewComponentCollection() Collection {
// NewCollection creates empty collection
func NewCollection() Collection {
return make(Collection)
}

Expand All @@ -13,8 +13,8 @@ func (collection Collection) ByName(name string) *Component {
return collection[name]
}

// Add adds components to existing collection
func (collection Collection) Add(components ...*Component) Collection {
// With adds components and returns the collection
func (collection Collection) With(components ...*Component) Collection {
for _, component := range components {
collection[component.Name()] = component
}
Expand Down
16 changes: 8 additions & 8 deletions component/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestCollection_ByName(t *testing.T) {
}{
{
name: "component found",
components: NewComponentCollection().Add(New("c1"), New("c2")),
components: NewCollection().With(New("c1"), New("c2")),
args: args{
name: "c2",
},
Expand All @@ -32,7 +32,7 @@ func TestCollection_ByName(t *testing.T) {
},
{
name: "component not found",
components: NewComponentCollection().Add(New("c1"), New("c2")),
components: NewCollection().With(New("c1"), New("c2")),
args: args{
name: "c3",
},
Expand All @@ -46,7 +46,7 @@ func TestCollection_ByName(t *testing.T) {
}
}

func TestCollection_Add(t *testing.T) {
func TestCollection_With(t *testing.T) {
type args struct {
components []*Component
}
Expand All @@ -58,7 +58,7 @@ func TestCollection_Add(t *testing.T) {
}{
{
name: "adding nothing to empty collection",
collection: NewComponentCollection(),
collection: NewCollection(),
args: args{
components: nil,
},
Expand All @@ -68,7 +68,7 @@ func TestCollection_Add(t *testing.T) {
},
{
name: "adding to empty collection",
collection: NewComponentCollection(),
collection: NewCollection(),
args: args{
components: []*Component{New("c1"), New("c2")},
},
Expand All @@ -81,7 +81,7 @@ func TestCollection_Add(t *testing.T) {
},
{
name: "adding to non-empty collection",
collection: NewComponentCollection().Add(New("c1"), New("c2")),
collection: NewCollection().With(New("c1"), New("c2")),
args: args{
components: []*Component{New("c3"), New("c4")},
},
Expand All @@ -97,9 +97,9 @@ func TestCollection_Add(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.collection.Add(tt.args.components...)
collectionAfter := tt.collection.With(tt.args.components...)
if tt.assertions != nil {
tt.assertions(t, tt.collection)
tt.assertions(t, collectionAfter)
}
})
}
Expand Down
18 changes: 11 additions & 7 deletions component/component.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package component

import (
"errors"
"fmt"
"github.com/hovsep/fmesh/port"
)
Expand Down Expand Up @@ -89,10 +90,6 @@ func (c *Component) hasActivationFunction() bool {
// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
defer func() {
c.Inputs().Clear()
}()

defer func() {
if r := recover(); r != nil {
activationResult = c.newActivationResultPanicked(fmt.Errorf("panicked with: %v", r))
Expand All @@ -102,7 +99,6 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
if !c.hasActivationFunction() {
//Activation function is not set (maybe useful while the mesh is under development)
activationResult = c.newActivationResultNoFunction()

return
}

Expand All @@ -115,14 +111,17 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationResultWaitingForInputs(err)
return
}

if err != nil {
activationResult = c.newActivationResultReturnedError(err)

return
}

activationResult = c.newActivationResultOK()

return
}

Expand All @@ -132,3 +131,8 @@ func (c *Component) FlushOutputs() {
out.Flush()
}
}

// ClearInputs clears all input ports
func (c *Component) ClearInputs() {
c.Inputs().Clear()
}
4 changes: 2 additions & 2 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,10 @@ func TestComponent_MaybeActivate(t *testing.T) {
assert.Equal(t, tt.wantActivationResult.Activated(), gotActivationResult.Activated())
assert.Equal(t, tt.wantActivationResult.ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.wantActivationResult.Code(), gotActivationResult.Code())
if tt.wantActivationResult.HasError() {
if tt.wantActivationResult.IsError() {
assert.EqualError(t, gotActivationResult.Error(), tt.wantActivationResult.Error().Error())
} else {
assert.False(t, gotActivationResult.HasError())
assert.False(t, gotActivationResult.IsError())
}

})
Expand Down
19 changes: 19 additions & 0 deletions component/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package component

import (
"errors"
"fmt"
)

var (
errWaitingForInputs = errors.New("component is waiting for some inputs")
errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs)
)

// NewErrWaitForInputs returns respective error
func NewErrWaitForInputs(keepInputs bool) error {
if keepInputs {
return errWaitingForInputsKeep
}
return errWaitingForInputs
}
16 changes: 13 additions & 3 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type FMesh struct {
func New(name string) *FMesh {
return &FMesh{
name: name,
components: component.NewComponentCollection(),
components: component.NewCollection(),
config: defaultConfig,
}
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (fm *FMesh) WithDescription(description string) *FMesh {
// WithComponents adds components to f-mesh
func (fm *FMesh) WithComponents(components ...*component.Component) *FMesh {
for _, c := range components {
fm.components.Add(c)
fm.components = fm.components.With(c)
}
return fm
}
Expand Down Expand Up @@ -106,7 +106,17 @@ func (fm *FMesh) drainComponents(cycle *cycle.Cycle) {
continue
}

if component.IsWaitingForInput(activationResult) {
if !component.WantsToKeepInputs(activationResult) {
c.ClearInputs()
}
// Components waiting for inputs are not flushed
continue
}

// Normally components are fully drained
c.FlushOutputs()
c.ClearInputs()
}
}

Expand All @@ -127,7 +137,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) {
}

func (fm *FMesh) mustStop(cycleResult *cycle.Cycle, cycleNum int) (bool, error) {
if (fm.config.CyclesLimit > 0) && (cycleNum >= fm.config.CyclesLimit) {
if (fm.config.CyclesLimit > 0) && (cycleNum > fm.config.CyclesLimit) {
return true, ErrReachedMaxAllowedCycles
}

Expand Down
4 changes: 2 additions & 2 deletions fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,10 @@ func TestFMesh_Run(t *testing.T) {
assert.Equal(t, tt.want[i].ActivationResults()[componentName].ComponentName(), gotActivationResult.ComponentName())
assert.Equal(t, tt.want[i].ActivationResults()[componentName].Code(), gotActivationResult.Code())

if tt.want[i].ActivationResults()[componentName].HasError() {
if tt.want[i].ActivationResults()[componentName].IsError() {
assert.EqualError(t, tt.want[i].ActivationResults()[componentName].Error(), gotActivationResult.Error().Error())
} else {
assert.False(t, gotActivationResult.HasError())
assert.False(t, gotActivationResult.IsError())
}
}
}
Expand Down
Loading
Loading