Skip to content

Commit

Permalink
Waiting for inputs: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hovsep committed Oct 4, 2024
1 parent 730104e commit 7b3d912
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 8 deletions.
50 changes: 50 additions & 0 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,56 @@ func TestComponent_MaybeActivate(t *testing.T) {
WithActivationCode(ActivationCodePanicked).
WithError(errors.New("panicked with: oh shrimps")),
},
{
name: "component is waiting for inputs",
getComponent: func() *Component {
c1 := New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
if !inputs.ByNames("i1", "i2").AllHaveSignals() {
return NewErrWaitForInputs(false)
}
return nil
})

// Only one input set
c1.Inputs().ByName("i1").PutSignals(signal.New(123))

return c1
},
wantActivationResult: &ActivationResult{
componentName: "c1",
activated: true,
code: ActivationCodeWaitingForInputsClear,
err: NewErrWaitForInputs(false),
},
},
{
name: "component is waiting for inputs and wants to keep them",
getComponent: func() *Component {
c1 := New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
if !inputs.ByNames("i1", "i2").AllHaveSignals() {
return NewErrWaitForInputs(true)
}
return nil
})

// Only one input set
c1.Inputs().ByName("i1").PutSignals(signal.New(123))

return c1
},
wantActivationResult: &ActivationResult{
componentName: "c1",
activated: true,
code: ActivationCodeWaitingForInputsKeep,
err: NewErrWaitForInputs(true),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
25 changes: 17 additions & 8 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,29 @@ func (fm *FMesh) drainComponents(cycle *cycle.Cycle) {
activationResult := cycle.ActivationResults().ByComponentName(c.Name())

if !activationResult.Activated() {
// Component did not activate, so it did not create new output signals, hence nothing to drain
continue
}

// By default, all outputs are flushed and all inputs are cleared
shouldFlushOutputs := true
shouldClearInputs := true

if component.IsWaitingForInput(activationResult) {
if !component.WantsToKeepInputs(activationResult) {
c.ClearInputs()
}
// Components waiting for inputs are not flushed
continue
// @TODO: maybe we should clear outputs
// in order to prevent leaking outputs from previous cycle
// (if outputs were set before returning errWaitingForInputs)
shouldFlushOutputs = false
shouldClearInputs = !component.WantsToKeepInputs(activationResult)
}

// Normally components are fully drained
c.FlushOutputs()
c.ClearInputs()
if shouldFlushOutputs {
c.FlushOutputs()
}

if shouldClearInputs {
c.ClearInputs()
}
}
}

Expand Down
201 changes: 201 additions & 0 deletions fmesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,204 @@ func TestFMesh_mustStop(t *testing.T) {
})
}
}

func TestFMesh_drainComponents(t *testing.T) {
type args struct {
cycle *cycle.Cycle
}
tests := []struct {
name string
getFM func() *FMesh
args args
assertions func(t *testing.T, fm *FMesh)
}{
{
name: "component not activated",
getFM: func() *FMesh {
fm := New("fm").
WithComponents(
component.New("c1").
WithDescription("This component has no activation function").
WithInputs("i1").
WithOutputs("o1"))

fm.Components().
ByName("c1").
Inputs().
ByName("i1").
PutSignals(signal.New("input signal"))

return fm
},
args: args{
cycle: cycle.New().
WithActivationResults(
component.NewActivationResult("c1").
SetActivated(false).
WithActivationCode(component.ActivationCodeNoInput)),
},
assertions: func(t *testing.T, fm *FMesh) {
// Assert that inputs are not cleared
assert.True(t, fm.Components().ByName("c1").Inputs().ByName("i1").HasSignals())
},
},
{
name: "component fully drained",
getFM: func() *FMesh {
c1 := component.New("c1").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

c2 := component.New("c2").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

// Pipe
c1.Outputs().ByName("o1").PipeTo(c2.Inputs().ByName("i1"))

// Simulate activation of c1
c1.Outputs().ByName("o1").PutSignals(signal.New("this signal is generated by c1"))

return New("fm").WithComponents(c1, c2)
},
args: args{
cycle: cycle.New().
WithActivationResults(
component.NewActivationResult("c1").
SetActivated(true).
WithActivationCode(component.ActivationCodeOK),
component.NewActivationResult("c2").
SetActivated(false).
WithActivationCode(component.ActivationCodeNoInput)),
},
assertions: func(t *testing.T, fm *FMesh) {
// c1 output is cleared
assert.False(t, fm.Components().ByName("c1").Outputs().ByName("o1").HasSignals())

// c2 input received flushed signal
assert.True(t, fm.Components().ByName("c2").Inputs().ByName("i1").HasSignals())

assert.Equal(t, "this signal is generated by c1", fm.Components().ByName("c2").Inputs().ByName("i1").Signals().FirstPayload().(string))
},
},
{
name: "component is waiting for inputs",
getFM: func() *FMesh {

c1 := component.New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

c2 := component.New("c2").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

// Pipe
c1.Outputs().ByName("o1").PipeTo(c2.Inputs().ByName("i1"))

// Simulate activation of c1
// NOTE: normally component should not create any output signal if it is waiting for inputs
// but technically there is no limitation to do that and then return the special error to wait for inputs.
// F-mesh just never flushes components waiting for inputs, so this test checks that
c1.Outputs().ByName("o1").PutSignals(signal.New("this signal is generated by c1"))

// Also simulate input signal on one port
c1.Inputs().ByName("i1").PutSignals(signal.New("this is input signal for c1"))

return New("fm").WithComponents(c1, c2)
},
args: args{
cycle: cycle.New().
WithActivationResults(
component.NewActivationResult("c1").
SetActivated(true).
WithActivationCode(component.ActivationCodeWaitingForInputsClear).
WithError(component.NewErrWaitForInputs(false)),
component.NewActivationResult("c2").
SetActivated(false).
WithActivationCode(component.ActivationCodeNoInput),
),
},
assertions: func(t *testing.T, fm *FMesh) {
// As c1 is waiting for inputs it's outputs must not be flushed
assert.False(t, fm.Components().ByName("c2").Inputs().ByName("i1").HasSignals())

// The inputs must be cleared
assert.False(t, fm.Components().ByName("c1").Inputs().AnyHasSignals())
},
},
{
name: "component is waiting for inputs and wants to keep input signals",
getFM: func() *FMesh {

c1 := component.New("c1").
WithInputs("i1", "i2").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

c2 := component.New("c2").
WithInputs("i1").
WithOutputs("o1").
WithActivationFunc(func(inputs port.Collection, outputs port.Collection) error {
return nil
})

// Pipe
c1.Outputs().ByName("o1").PipeTo(c2.Inputs().ByName("i1"))

// Simulate activation of c1
// NOTE: normally component should not create any output signal if it is waiting for inputs
// but technically there is no limitation to do that and then return the special error to wait for inputs.
// F-mesh just never flushes components waiting for inputs, so this test checks that
c1.Outputs().ByName("o1").PutSignals(signal.New("this signal is generated by c1"))

// Also simulate input signal on one port
c1.Inputs().ByName("i1").PutSignals(signal.New("this is input signal for c1"))

return New("fm").WithComponents(c1, c2)
},
args: args{
cycle: cycle.New().
WithActivationResults(
component.NewActivationResult("c1").
SetActivated(true).
WithActivationCode(component.ActivationCodeWaitingForInputsKeep).
WithError(component.NewErrWaitForInputs(true)),
component.NewActivationResult("c2").
SetActivated(false).
WithActivationCode(component.ActivationCodeNoInput),
),
},
assertions: func(t *testing.T, fm *FMesh) {
// As c1 is waiting for inputs it's outputs must not be flushed
assert.False(t, fm.Components().ByName("c2").Inputs().ByName("i1").HasSignals())

// The inputs must NOT be cleared
assert.True(t, fm.Components().ByName("c1").Inputs().AnyHasSignals())
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fm := tt.getFM()
fm.drainComponents(tt.args.cycle)
if tt.assertions != nil {
tt.assertions(t, fm)
}
})
}
}

0 comments on commit 7b3d912

Please sign in to comment.