Skip to content

Commit

Permalink
Foreach prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
ptodev committed Oct 31, 2024
1 parent 078ca9e commit 378cb04
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 1 deletion.
69 changes: 69 additions & 0 deletions internal/runtime/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile {
return tc
}

// This is a copy of TestImportFile.
// It may need to be modified further to make it work with a foreach.
func TestForeach(t *testing.T) {
directory := "./testdata/foreach"
for _, file := range getTestFiles(directory, t) {
tc := buildTestImportFile(t, filepath.Join(directory, file.Name()))
t.Run(tc.description, func(t *testing.T) {
testConfig2(t, tc.main, tc.reloadConfig, nil)
})
}
}

func TestImportFile(t *testing.T) {
directory := "./testdata/import_file"
for _, file := range getTestFiles(directory, t) {
Expand Down Expand Up @@ -349,6 +361,63 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func())
}
}

// This function is a copy of testConfig above.
func testConfig2(t *testing.T, config string, reloadConfig string, update func()) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)

err := ctrl.LoadSource(f, nil, "")
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
}()

wg.Add(1)
go func() {
defer wg.Done()
ctrl.Run(ctx)
}()

// Check for initial condition
require.Eventually(t, func() bool {
export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final")
// If each iteration of the for loop adds a 1,
// and there are 3 iterations, we expect 3 to be the end result.
//TODO: Make this configurable?
return export.Sum == 3
}, 3*time.Second, 10*time.Millisecond)

// if update != nil {
// update()

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }

// if reloadConfig != "" {
// f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig))
// require.NoError(t, err)
// require.NotNil(t, f)

// // Reload the controller with the new config.
// err = ctrl.LoadSource(f, nil)
// require.NoError(t, err)

// // Export should be -10 after update
// require.Eventually(t, func() bool {
// export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum")
// return export.LastAdded <= -10
// }, 3*time.Second, 10*time.Millisecond)
// }
}

func testConfigError(t *testing.T, config string, expectedError string) {
defer verifyNoGoroutineLeaks(t)
ctrl, f := setup(t, config)
Expand Down
6 changes: 6 additions & 0 deletions internal/runtime/internal/controller/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d
return NewTracingConfigNode(block, globals), nil
case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit:
return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil
case importsource.BlockForeach:
return NewForeachConfigNode(block, globals), nil
default:
var diags diag.Diagnostics
diags.Add(diag.Diagnostic{
Expand All @@ -50,6 +52,7 @@ type ConfigNodeMap struct {
argumentMap map[string]*ArgumentConfigNode
exportMap map[string]*ExportConfigNode
importMap map[string]*ImportConfigNode
foreachMap map[string]*ForeachConfigNode
}

// NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called
Expand All @@ -61,6 +64,7 @@ func NewConfigNodeMap() *ConfigNodeMap {
argumentMap: map[string]*ArgumentConfigNode{},
exportMap: map[string]*ExportConfigNode{},
importMap: map[string]*ImportConfigNode{},
foreachMap: map[string]*ForeachConfigNode{},
}
}

Expand All @@ -80,6 +84,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics {
nodeMap.tracing = n
case *ImportConfigNode:
nodeMap.importMap[n.Label()] = n
case *ForeachConfigNode:
nodeMap.foreachMap[n.Label()] = n
default:
diags.Add(diag.Diagnostic{
Severity: diag.SeverityLevelError,
Expand Down
47 changes: 47 additions & 0 deletions internal/runtime/internal/controller/node_config_foreach.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package controller

import (
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)

type ForeachConfigNode struct {
nodeID string
label string
block *ast.BlockStmt // Current Alloy blocks to derive config from
}

var _ BlockNode = (*ForeachConfigNode)(nil)

// For now the Foreach doesn't have the ability to export arguments.
//TODO: We could implement this in the future?

type ForeachArguments struct {
Collection string `alloy:"collection,attr`
//TODO: Is the "var" argument really needed?
// We could just have a variable with a fixed name referencing the current thing we are iterating over.
Var string `alloy:"var,attr,optional`
}

func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode {
nodeID := BlockComponentID(block).String()

return &ForeachConfigNode{
nodeID: nodeID,
label: block.Label,
block: block,
}
}

func (fn *ForeachConfigNode) Label() string { return fn.label }

func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID }

func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block }

func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error {
return nil
}

func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) {
}
4 changes: 4 additions & 0 deletions internal/runtime/internal/importsource/import_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ const (
String
Git
HTTP
Foreach
)

const (
BlockImportFile = "import.file"
BlockImportString = "import.string"
BlockImportHTTP = "import.http"
BlockImportGit = "import.git"
BlockForeach = "foreach"
)

const ModulePath = "module_path"
Expand Down Expand Up @@ -67,6 +69,8 @@ func GetSourceType(fullName string) SourceType {
return HTTP
case BlockImportGit:
return Git
case BlockForeach:
return Foreach
}
panic(fmt.Errorf("name does not map to a known source type: %v", fullName))
}
67 changes: 67 additions & 0 deletions internal/runtime/internal/testcomponents/sumation1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package testcomponents

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.summation_entry",
Stability: featuregate.StabilityPublicPreview,
Args: SummationConfig_Entry{},
Exports: SummationExports_Entry{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewSummation_Entry(opts, args.(SummationConfig_Entry))
},
})
}

// Accepts a single integer input and forwards it to all the components listed in forward_to.
type SummationConfig_Entry struct {
Input int `alloy:"input,attr"`
//TODO: What should the type be?
ForwardTo []IntReceiver `alloy:"forward_to,attr"`
}

type SummationExports_Entry struct {
}

type Summation_Entry struct {
opts component.Options
log log.Logger
}

// NewSummation creates a new summation component.
func NewSummation_Entry(o component.Options, cfg SummationConfig_Entry) (*Summation_Entry, error) {
t := &Summation_Entry{opts: o, log: o.Logger}
if err := t.Update(cfg); err != nil {
return nil, err
}
return t, nil
}

var (
_ component.Component = (*Summation_Entry)(nil)
)

// Run implements Component.
func (t *Summation_Entry) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Update implements Component.
func (t *Summation_Entry) Update(args component.Arguments) error {
c := args.(SummationConfig_Entry)

for _, r := range c.ForwardTo {
r.ReceiveInt(c.Input)
}

return nil
}
81 changes: 81 additions & 0 deletions internal/runtime/internal/testcomponents/sumation2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package testcomponents

import (
"context"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/featuregate"
"go.uber.org/atomic"
)

func init() {
component.Register(component.Registration{
Name: "testcomponents.summation2",
Stability: featuregate.StabilityPublicPreview,
Args: SummationConfig_2{},
Exports: SummationExports_2{},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return NewSummation_2(opts, args.(SummationConfig_2))
},
})
}

type IntReceiver interface {
ReceiveInt(int)
}

type IntReceiverImpl struct {
sum atomic.Int32
}

func (r IntReceiverImpl) ReceiveInt(i int) {
r.sum.Add(int32(i))
}

type SummationConfig_2 struct {
}

type SummationExports_2 struct {
Receiver IntReceiver `alloy:"receiver,attr"`
Sum int `alloy:"sum,attr"`
LastAdded int `alloy:"last_added,attr"`
}

type Summation_2 struct {
opts component.Options
log log.Logger
receiver IntReceiver
}

// NewSummation creates a new summation component.
func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) {
recv := IntReceiverImpl{}
o.OnStateChange(SummationExports_2{
Receiver: recv,
})

t := &Summation_2{
opts: o,
log: o.Logger,
receiver: recv,
}

return t, nil
}

var (
_ component.Component = (*Summation)(nil)
)

// Run implements Component.
func (t *Summation_2) Run(ctx context.Context) error {
<-ctx.Done()
return nil
}

// Update implements Component.
func (t *Summation_2) Update(args component.Arguments) error {
return nil
}
12 changes: 12 additions & 0 deletions internal/runtime/module.alloy
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
declare "config" {
argument "input" {}

testcomponents.passthrough "pt" {
input = argument.input.value
lag = "1ms"
}

export "output" {
value = testcomponents.passthrough.pt.output
}
}
2 changes: 1 addition & 1 deletion internal/runtime/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func sourceFromBody(body ast.Body) (*Source, error) {
switch fullName {
case "declare":
declares = append(declares, stmt)
case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git":
case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git", "foreach":
configs = append(configs, stmt)
default:
components = append(components, stmt)
Expand Down
17 changes: 17 additions & 0 deletions internal/runtime/testdata/foreach/foreach_1.txtar
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- main.alloy --
foreach "testForeach" {
collection = [1, 2, 3, 4]
//var = "num"

// Similar to testcomponents.summation, but with a "forward_to"
testcomponents.summation1 "sum" {
//TODO: Use the num variable here
// input = num
input = 1
forward_to = testcomponents.summation2.final.receiver
}
}

// Similar to testcomponents.summation, but with a "receiver" export
testcomponents.summation2 "final" {
}

0 comments on commit 378cb04

Please sign in to comment.