Skip to content

Commit

Permalink
Merge pull request #41 from hovsep/v.0.1.0
Browse files Browse the repository at this point in the history
V.0.1.0
  • Loading branch information
hovsep authored Sep 17, 2024
2 parents 38dd79f + 3c08fc3 commit fc07480
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 119 deletions.
9 changes: 4 additions & 5 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func (c *Component) WithDescription(description string) *Component {

// WithInputs ads input ports
func (c *Component) WithInputs(portNames ...string) *Component {
c.inputs.Add(port.NewPortGroup(portNames...)...)
c.inputs = c.inputs.Add(port.NewPortGroup(portNames...)...)
return c
}

// WithOutputs adds output ports
func (c *Component) WithOutputs(portNames ...string) *Component {
c.outputs.Add(port.NewPortGroup(portNames...)...)
c.outputs = c.outputs.Add(port.NewPortGroup(portNames...)...)
return c
}

Expand Down Expand Up @@ -92,7 +92,6 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

//@TODO:: https://github.com/hovsep/fmesh/issues/15
if !c.inputs.AnyHasSignal() {
//No inputs set, stop here
activationResult = c.newActivationCodeNoInput()
Expand All @@ -103,10 +102,10 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
//Run the computation
err := c.f(c.inputs, c.outputs)

if IsWaitingForInputError(err) {
if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationCodeWaitingForInput()

if !errors.Is(err, ErrWaitingForInputKeepInputs) {
if !errors.Is(err, errWaitingForInputsKeep) {
c.inputs.ClearSignal()
}

Expand Down
43 changes: 32 additions & 11 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestComponent_Inputs(t *testing.T) {
name: "with inputs",
component: NewComponent("c1").WithInputs("i1", "i2"),
want: port.Collection{
"i1": port.NewPort("i1"),
"i2": port.NewPort("i2"),
port.NewPort("i1"),
port.NewPort("i2"),
},
},
}
Expand Down Expand Up @@ -200,8 +200,8 @@ func TestComponent_Outputs(t *testing.T) {
name: "with outputs",
component: NewComponent("c1").WithOutputs("o1", "o2"),
want: port.Collection{
"o1": port.NewPort("o1"),
"o2": port.NewPort("o2"),
port.NewPort("o1"),
port.NewPort("o2"),
},
},
}
Expand Down Expand Up @@ -306,8 +306,8 @@ func TestComponent_WithInputs(t *testing.T) {
name: "c1",
description: "",
inputs: port.Collection{
"p1": port.NewPort("p1"),
"p2": port.NewPort("p2"),
port.NewPort("p1"),
port.NewPort("p2"),
},
outputs: port.Collection{},
f: nil,
Expand Down Expand Up @@ -358,8 +358,8 @@ func TestComponent_WithOutputs(t *testing.T) {
description: "",
inputs: port.Collection{},
outputs: port.Collection{
"p1": port.NewPort("p1"),
"p2": port.NewPort("p2"),
port.NewPort("p1"),
port.NewPort("p2"),
},
f: nil,
},
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestComponent_MaybeActivate(t *testing.T) {
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {

if !inputs.ByNames("i1", "i2").AllHaveSignal() {
return ErrWaitingForInputResetInputs
return NewErrWaitForInputs(false)
}

return nil
Expand All @@ -432,14 +432,35 @@ func TestComponent_MaybeActivate(t *testing.T) {
WithActivationCode(ActivationCodeNoInput),
},
{
name: "component is waiting for input",
name: "component is waiting for input, reset inputs",
getComponent: func() *Component {
c := NewComponent("c1").
WithInputs("i1", "i2").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {

if !inputs.ByNames("i1", "i2").AllHaveSignal() {
return ErrWaitingForInputResetInputs
return NewErrWaitForInputs(false)
}

return nil
})
//Only one input set
c.Inputs().ByName("i1").PutSignal(signal.New(123))
return c
},
wantActivationResult: NewActivationResult("c1").
SetActivated(false).
WithActivationCode(ActivationCodeWaitingForInput),
},
{
name: "component is waiting for input, keep inputs",
getComponent: func() *Component {
c := NewComponent("c1").
WithInputs("i1", "i2").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {

if !inputs.ByNames("i1", "i2").AllHaveSignal() {
return NewErrWaitForInputs(true)
}

return nil
Expand Down
18 changes: 12 additions & 6 deletions component/errors.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package component

import "errors"
import (
"errors"
"fmt"
)

var (
//@TODO: provide wrapper methods so exact input can be specified within error
ErrWaitingForInputResetInputs = errors.New("component is waiting for one or more inputs. All inputs will be reset")
ErrWaitingForInputKeepInputs = errors.New("component is waiting for one or more inputs. All inputs will be kept")
errWaitingForInputs = errors.New("component is waiting for some inputs")
errWaitingForInputsKeep = fmt.Errorf("%w: do not clear input ports", errWaitingForInputs)
)

func IsWaitingForInputError(err error) bool {
return errors.Is(err, ErrWaitingForInputResetInputs) || errors.Is(err, ErrWaitingForInputKeepInputs)
// NewErrWaitForInputs returns respective error
func NewErrWaitForInputs(keepInputs bool) error {
if keepInputs {
return errWaitingForInputsKeep
}
return errWaitingForInputs
}
39 changes: 0 additions & 39 deletions component/errors_test.go

This file was deleted.

37 changes: 11 additions & 26 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,45 +53,30 @@ func (fm *FMesh) WithErrorHandlingStrategy(strategy ErrorHandlingStrategy) *FMes
return fm
}

// runCycle runs one activation cycle (tries to activate all components)
// runCycle runs one activation cycle (tries to activate ready components)
func (fm *FMesh) runCycle() *cycle.Cycle {
cycleResult := cycle.New()
newCycle := cycle.New()

if len(fm.components) == 0 {
return cycleResult
return newCycle
}

activationResultsChan := make(chan *component.ActivationResult) //@TODO: close the channel
doneChan := make(chan struct{}) //@TODO: close the channel

var wg sync.WaitGroup

go func() {
for {
select {
case aRes := <-activationResultsChan:
//@TODO :check for closed channel
cycleResult.Lock()
cycleResult.ActivationResults().Add(aRes)
cycleResult.Unlock()
case <-doneChan:
return
}
}
}()

for _, c := range fm.components {
wg.Add(1)
c := c //@TODO: check if this needed
go func() {

go func(component *component.Component, cycle *cycle.Cycle) {
defer wg.Done()
activationResultsChan <- c.MaybeActivate()
}()

cycle.Lock()
cycle.ActivationResults().Add(c.MaybeActivate())
cycle.Unlock()
}(c, newCycle)
}

wg.Wait()
doneChan <- struct{}{} //@TODO: no need to send close signal, just close the channel
return cycleResult
return newCycle
}

// DrainComponents drains the data from all components outputs
Expand Down
2 changes: 1 addition & 1 deletion fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func TestFMesh_runCycle(t *testing.T) {
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
if !inputs.ByNames("i1", "i2").AllHaveSignal() {
return component.ErrWaitingForInputKeepInputs
return component.NewErrWaitForInputs(true)
}
return nil
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"testing"
)

func Test_BasicMath(t *testing.T) {
func Test_Math(t *testing.T) {
tests := []struct {
name string
setupFM func() *fmesh.FMesh
Expand Down
Loading

0 comments on commit fc07480

Please sign in to comment.