Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to pipe from inputs #45

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions component/activation_result.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package component

import "fmt"
import (
"errors"
"fmt"
)

// ActivationResult defines the result (possibly an error) of the activation of given component in given cycle
type ActivationResult struct {
componentName string
activated bool
inputKeys []string //@TODO: check if we can replace this by one int which will show the index of last signal used as input within signals collection (use signal position in the collection as it's unique id)
code ActivationResultCode
err error
}
Expand Down Expand Up @@ -88,32 +92,69 @@ func (ar *ActivationResult) WithError(err error) *ActivationResult {
return ar
}

func (ar *ActivationResult) WithInputKeys(keys []string) *ActivationResult {
ar.inputKeys = keys
return ar
}

func (ar *ActivationResult) InputKeys() []string {
return ar.inputKeys
}

// newActivationResultOK builds a specific activation result
func (c *Component) newActivationResultOK() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeOK)
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeOK).
WithInputKeys(c.Inputs().GetSignalKeys())

}

// newActivationCodeNoInput builds a specific activation result
func (c *Component) newActivationCodeNoInput() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoInput)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoInput)
}

// newActivationCodeNoFunction builds a specific activation result
func (c *Component) newActivationCodeNoFunction() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeNoFunction)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeNoFunction)
}

// newActivationCodeWaitingForInput builds a specific activation result
func (c *Component) newActivationCodeWaitingForInput() *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(false).WithActivationCode(ActivationCodeWaitingForInput)
return NewActivationResult(c.Name()).
SetActivated(false).
WithActivationCode(ActivationCodeWaitingForInput)
}

// newActivationCodeReturnedError builds a specific activation result
func (c *Component) newActivationCodeReturnedError(err error) *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodeReturnedError).WithError(fmt.Errorf("component returned an error: %w", err))
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodeReturnedError).
WithError(fmt.Errorf("component returned an error: %w", err)).
WithInputKeys(c.Inputs().GetSignalKeys())
}

// newActivationCodePanicked builds a specific activation result
func (c *Component) newActivationCodePanicked(err error) *ActivationResult {
return NewActivationResult(c.Name()).SetActivated(true).WithActivationCode(ActivationCodePanicked).WithError(err)
return NewActivationResult(c.Name()).
SetActivated(true).
WithActivationCode(ActivationCodePanicked).
WithError(err).
WithInputKeys(c.Inputs().GetSignalKeys())
}

// isWaitingForInput tells whether component is waiting for specific inputs
func (c *Component) isWaitingForInput(activationResult *ActivationResult) bool {
return activationResult.HasError() && errors.Is(activationResult.Error(), errWaitingForInputs)
}

// WantsToKeepInputs tells whether component wants to keep signals on input ports for the next cycle
func (c *Component) WantsToKeepInputs(activationResult *ActivationResult) bool {
return c.isWaitingForInput(activationResult) && errors.Is(activationResult.Error(), errWaitingForInputsKeep)
}
9 changes: 9 additions & 0 deletions component/activation_result_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ func (collection ActivationResultCollection) HasActivatedComponents() bool {
}
return false
}

// ByComponentName returns the activation result of given component
func (collection ActivationResultCollection) ByComponentName(componentName string) *ActivationResult {
if result, ok := collection[componentName]; ok {
return result
}

return nil
}
27 changes: 12 additions & 15 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ func (c *Component) hasActivationFunction() bool {
}

// MaybeActivate tries to run the activation function if all required conditions are met
// @TODO: hide this method from user
func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
defer func() {
if r := recover(); r != nil {
//Clear inputs and exit
c.inputs.ClearSignals()
activationResult = c.newActivationCodePanicked(fmt.Errorf("panicked with: %v", r))
}
}()
Expand All @@ -99,22 +98,15 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

//Run the computation
err := c.f(c.inputs, c.outputs)
//Invoke the activation func
err := c.f(c.Inputs(), c.Outputs())

if errors.Is(err, errWaitingForInputs) {
activationResult = c.newActivationCodeWaitingForInput()

if !errors.Is(err, errWaitingForInputsKeep) {
c.inputs.ClearSignals()
}

return
}

//Clear inputs
c.inputs.ClearSignals()

if err != nil {
activationResult = c.newActivationCodeReturnedError(err)

Expand All @@ -126,9 +118,14 @@ func (c *Component) MaybeActivate() (activationResult *ActivationResult) {
return
}

// FlushOutputs pushed signals out of the component outputs to pipes and clears outputs
// FlushInputs ...
// @TODO: hide this method from user
func (c *Component) FlushInputs() {
c.inputs.Flush(false)
}

// FlushOutputs ...
// @TODO: hide this method from user
func (c *Component) FlushOutputs() {
for _, out := range c.outputs {
out.Flush()
}
c.outputs.Flush(true)
}
7 changes: 5 additions & 2 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func TestComponent_WithActivationFunc(t *testing.T) {
name string
component *Component
args args
want *Component
}{
{
name: "happy path",
Expand All @@ -238,7 +237,11 @@ func TestComponent_WithActivationFunc(t *testing.T) {
err1 := componentAfter.f(testInputs1, testOutputs1)
err2 := tt.args.f(testInputs2, testOutputs2)
assert.Equal(t, err1, err2)
assert.Equal(t, testOutputs1, testOutputs2)

//Compare signals without keys (because they are random)
assert.ElementsMatch(t, testOutputs1.ByName("out1").Signals().AsGroup(), testOutputs2.ByName("out1").Signals().AsGroup())
assert.ElementsMatch(t, testOutputs1.ByName("out2").Signals().AsGroup(), testOutputs2.ByName("out2").Signals().AsGroup())

})
}
}
Expand Down
9 changes: 7 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import (
type ErrorHandlingStrategy int

const (
StopOnFirstError ErrorHandlingStrategy = iota
// StopOnFirstErrorOrPanic stops the f-mesh on first error or panic
StopOnFirstErrorOrPanic ErrorHandlingStrategy = iota

// StopOnFirstPanic ignores errors, but stops the f-mesh on first panic
StopOnFirstPanic

// IgnoreAll allows to continue running the f-mesh regardless of how components finish their activation functions
IgnoreAll
)

var (
ErrHitAnError = errors.New("f-mesh hit an error and will be stopped")
ErrHitAnErrorOrPanic = errors.New("f-mesh hit an error or panic and will be stopped")
ErrHitAPanic = errors.New("f-mesh hit a panic and will be stopped")
ErrUnsupportedErrorHandlingStrategy = errors.New("unsupported error handling strategy")
)
23 changes: 18 additions & 5 deletions fmesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,23 @@ func (fm *FMesh) runCycle() *cycle.Cycle {
return newCycle
}

// DrainComponents drains the data from all components outputs
func (fm *FMesh) drainComponents() {
// DrainComponents drains the data from activated components
func (fm *FMesh) drainComponentsAfterCycle(cycle *cycle.Cycle) {
for _, c := range fm.components {
activationResult := cycle.ActivationResults().ByComponentName(c.Name())

if !activationResult.Activated() {
continue
}

c.FlushInputs()
c.FlushOutputs()

keepInputs := c.WantsToKeepInputs(activationResult)
if !keepInputs {
c.Inputs().RemoveSignalsByKeys(activationResult.InputKeys())
}

}
}

Expand All @@ -98,7 +111,7 @@ func (fm *FMesh) Run() (cycle.Collection, error) {
return allCycles, err
}

fm.drainComponents()
fm.drainComponentsAfterCycle(cycleResult)
}
}

Expand All @@ -110,8 +123,8 @@ func (fm *FMesh) mustStop(cycleResult *cycle.Cycle) (bool, error) {

//Check if mesh must stop because of configured error handling strategy
switch fm.errorHandlingStrategy {
case StopOnFirstError:
return cycleResult.HasErrors(), ErrHitAnError
case StopOnFirstErrorOrPanic:
return cycleResult.HasErrors() || cycleResult.HasPanics(), ErrHitAnErrorOrPanic
case StopOnFirstPanic:
return cycleResult.HasPanics(), ErrHitAPanic
case IgnoreAll:
Expand Down
Loading
Loading