From 378cb04d3e34bf76d399508456955e65b9e4a76f Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 31 Oct 2024 19:35:46 +0000 Subject: [PATCH] Foreach prototype --- internal/runtime/import_test.go | 69 ++++++++++++++++ .../internal/controller/node_config.go | 6 ++ .../controller/node_config_foreach.go | 47 +++++++++++ .../internal/importsource/import_source.go | 4 + .../internal/testcomponents/sumation1.go | 67 +++++++++++++++ .../internal/testcomponents/sumation2.go | 81 +++++++++++++++++++ internal/runtime/module.alloy | 12 +++ internal/runtime/source.go | 2 +- .../runtime/testdata/foreach/foreach_1.txtar | 17 ++++ 9 files changed, 304 insertions(+), 1 deletion(-) create mode 100644 internal/runtime/internal/controller/node_config_foreach.go create mode 100644 internal/runtime/internal/testcomponents/sumation1.go create mode 100644 internal/runtime/internal/testcomponents/sumation2.go create mode 100644 internal/runtime/module.alloy create mode 100644 internal/runtime/testdata/foreach/foreach_1.txtar diff --git a/internal/runtime/import_test.go b/internal/runtime/import_test.go index f6a1ab499b..a9134d8e43 100644 --- a/internal/runtime/import_test.go +++ b/internal/runtime/import_test.go @@ -81,6 +81,18 @@ func buildTestImportFile(t *testing.T, filename string) testImportFile { return tc } +// This is a copy of TestImportFile. +// It may need to be modified further to make it work with a foreach. +func TestForeach(t *testing.T) { + directory := "./testdata/foreach" + for _, file := range getTestFiles(directory, t) { + tc := buildTestImportFile(t, filepath.Join(directory, file.Name())) + t.Run(tc.description, func(t *testing.T) { + testConfig2(t, tc.main, tc.reloadConfig, nil) + }) + } +} + func TestImportFile(t *testing.T) { directory := "./testdata/import_file" for _, file := range getTestFiles(directory, t) { @@ -349,6 +361,63 @@ func testConfig(t *testing.T, config string, reloadConfig string, update func()) } } +// This function is a copy of testConfig above. +func testConfig2(t *testing.T, config string, reloadConfig string, update func()) { + defer verifyNoGoroutineLeaks(t) + ctrl, f := setup(t, config) + + err := ctrl.LoadSource(f, nil, "") + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + defer func() { + cancel() + wg.Wait() + }() + + wg.Add(1) + go func() { + defer wg.Done() + ctrl.Run(ctx) + }() + + // Check for initial condition + require.Eventually(t, func() bool { + export := getExport[testcomponents.SummationExports_2](t, ctrl, "", "testcomponents.summation2.final") + // If each iteration of the for loop adds a 1, + // and there are 3 iterations, we expect 3 to be the end result. + //TODO: Make this configurable? + return export.Sum == 3 + }, 3*time.Second, 10*time.Millisecond) + + // if update != nil { + // update() + + // // Export should be -10 after update + // require.Eventually(t, func() bool { + // export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + // return export.LastAdded <= -10 + // }, 3*time.Second, 10*time.Millisecond) + // } + + // if reloadConfig != "" { + // f, err = alloy_runtime.ParseSource(t.Name(), []byte(reloadConfig)) + // require.NoError(t, err) + // require.NotNil(t, f) + + // // Reload the controller with the new config. + // err = ctrl.LoadSource(f, nil) + // require.NoError(t, err) + + // // Export should be -10 after update + // require.Eventually(t, func() bool { + // export := getExport[testcomponents.SummationExports](t, ctrl, "", "testcomponents.summation.sum") + // return export.LastAdded <= -10 + // }, 3*time.Second, 10*time.Millisecond) + // } +} + func testConfigError(t *testing.T, config string, expectedError string) { defer verifyNoGoroutineLeaks(t) ctrl, f := setup(t, config) diff --git a/internal/runtime/internal/controller/node_config.go b/internal/runtime/internal/controller/node_config.go index 1293eac305..5338237caf 100644 --- a/internal/runtime/internal/controller/node_config.go +++ b/internal/runtime/internal/controller/node_config.go @@ -29,6 +29,8 @@ func NewConfigNode(block *ast.BlockStmt, globals ComponentGlobals) (BlockNode, d return NewTracingConfigNode(block, globals), nil case importsource.BlockImportFile, importsource.BlockImportString, importsource.BlockImportHTTP, importsource.BlockImportGit: return NewImportConfigNode(block, globals, importsource.GetSourceType(block.GetBlockName())), nil + case importsource.BlockForeach: + return NewForeachConfigNode(block, globals), nil default: var diags diag.Diagnostics diags.Add(diag.Diagnostic{ @@ -50,6 +52,7 @@ type ConfigNodeMap struct { argumentMap map[string]*ArgumentConfigNode exportMap map[string]*ExportConfigNode importMap map[string]*ImportConfigNode + foreachMap map[string]*ForeachConfigNode } // NewConfigNodeMap will create an initial ConfigNodeMap. Append must be called @@ -61,6 +64,7 @@ func NewConfigNodeMap() *ConfigNodeMap { argumentMap: map[string]*ArgumentConfigNode{}, exportMap: map[string]*ExportConfigNode{}, importMap: map[string]*ImportConfigNode{}, + foreachMap: map[string]*ForeachConfigNode{}, } } @@ -80,6 +84,8 @@ func (nodeMap *ConfigNodeMap) Append(configNode BlockNode) diag.Diagnostics { nodeMap.tracing = n case *ImportConfigNode: nodeMap.importMap[n.Label()] = n + case *ForeachConfigNode: + nodeMap.foreachMap[n.Label()] = n default: diags.Add(diag.Diagnostic{ Severity: diag.SeverityLevelError, diff --git a/internal/runtime/internal/controller/node_config_foreach.go b/internal/runtime/internal/controller/node_config_foreach.go new file mode 100644 index 0000000000..f27553832d --- /dev/null +++ b/internal/runtime/internal/controller/node_config_foreach.go @@ -0,0 +1,47 @@ +package controller + +import ( + "github.com/grafana/alloy/syntax/ast" + "github.com/grafana/alloy/syntax/vm" +) + +type ForeachConfigNode struct { + nodeID string + label string + block *ast.BlockStmt // Current Alloy blocks to derive config from +} + +var _ BlockNode = (*ForeachConfigNode)(nil) + +// For now the Foreach doesn't have the ability to export arguments. +//TODO: We could implement this in the future? + +type ForeachArguments struct { + Collection string `alloy:"collection,attr` + //TODO: Is the "var" argument really needed? + // We could just have a variable with a fixed name referencing the current thing we are iterating over. + Var string `alloy:"var,attr,optional` +} + +func NewForeachConfigNode(block *ast.BlockStmt, globals ComponentGlobals) *ForeachConfigNode { + nodeID := BlockComponentID(block).String() + + return &ForeachConfigNode{ + nodeID: nodeID, + label: block.Label, + block: block, + } +} + +func (fn *ForeachConfigNode) Label() string { return fn.label } + +func (fn *ForeachConfigNode) NodeID() string { return fn.nodeID } + +func (fn *ForeachConfigNode) Block() *ast.BlockStmt { return fn.block } + +func (fn *ForeachConfigNode) Evaluate(scope *vm.Scope) error { + return nil +} + +func (fn *ForeachConfigNode) UpdateBlock(b *ast.BlockStmt) { +} diff --git a/internal/runtime/internal/importsource/import_source.go b/internal/runtime/internal/importsource/import_source.go index ce3a369b98..e5b8d7ecd6 100644 --- a/internal/runtime/internal/importsource/import_source.go +++ b/internal/runtime/internal/importsource/import_source.go @@ -15,6 +15,7 @@ const ( String Git HTTP + Foreach ) const ( @@ -22,6 +23,7 @@ const ( BlockImportString = "import.string" BlockImportHTTP = "import.http" BlockImportGit = "import.git" + BlockForeach = "foreach" ) const ModulePath = "module_path" @@ -67,6 +69,8 @@ func GetSourceType(fullName string) SourceType { return HTTP case BlockImportGit: return Git + case BlockForeach: + return Foreach } panic(fmt.Errorf("name does not map to a known source type: %v", fullName)) } diff --git a/internal/runtime/internal/testcomponents/sumation1.go b/internal/runtime/internal/testcomponents/sumation1.go new file mode 100644 index 0000000000..1035c4b1c9 --- /dev/null +++ b/internal/runtime/internal/testcomponents/sumation1.go @@ -0,0 +1,67 @@ +package testcomponents + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" +) + +func init() { + component.Register(component.Registration{ + Name: "testcomponents.summation_entry", + Stability: featuregate.StabilityPublicPreview, + Args: SummationConfig_Entry{}, + Exports: SummationExports_Entry{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewSummation_Entry(opts, args.(SummationConfig_Entry)) + }, + }) +} + +// Accepts a single integer input and forwards it to all the components listed in forward_to. +type SummationConfig_Entry struct { + Input int `alloy:"input,attr"` + //TODO: What should the type be? + ForwardTo []IntReceiver `alloy:"forward_to,attr"` +} + +type SummationExports_Entry struct { +} + +type Summation_Entry struct { + opts component.Options + log log.Logger +} + +// NewSummation creates a new summation component. +func NewSummation_Entry(o component.Options, cfg SummationConfig_Entry) (*Summation_Entry, error) { + t := &Summation_Entry{opts: o, log: o.Logger} + if err := t.Update(cfg); err != nil { + return nil, err + } + return t, nil +} + +var ( + _ component.Component = (*Summation_Entry)(nil) +) + +// Run implements Component. +func (t *Summation_Entry) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +// Update implements Component. +func (t *Summation_Entry) Update(args component.Arguments) error { + c := args.(SummationConfig_Entry) + + for _, r := range c.ForwardTo { + r.ReceiveInt(c.Input) + } + + return nil +} diff --git a/internal/runtime/internal/testcomponents/sumation2.go b/internal/runtime/internal/testcomponents/sumation2.go new file mode 100644 index 0000000000..73e3f984bc --- /dev/null +++ b/internal/runtime/internal/testcomponents/sumation2.go @@ -0,0 +1,81 @@ +package testcomponents + +import ( + "context" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/featuregate" + "go.uber.org/atomic" +) + +func init() { + component.Register(component.Registration{ + Name: "testcomponents.summation2", + Stability: featuregate.StabilityPublicPreview, + Args: SummationConfig_2{}, + Exports: SummationExports_2{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewSummation_2(opts, args.(SummationConfig_2)) + }, + }) +} + +type IntReceiver interface { + ReceiveInt(int) +} + +type IntReceiverImpl struct { + sum atomic.Int32 +} + +func (r IntReceiverImpl) ReceiveInt(i int) { + r.sum.Add(int32(i)) +} + +type SummationConfig_2 struct { +} + +type SummationExports_2 struct { + Receiver IntReceiver `alloy:"receiver,attr"` + Sum int `alloy:"sum,attr"` + LastAdded int `alloy:"last_added,attr"` +} + +type Summation_2 struct { + opts component.Options + log log.Logger + receiver IntReceiver +} + +// NewSummation creates a new summation component. +func NewSummation_2(o component.Options, cfg SummationConfig_2) (*Summation_2, error) { + recv := IntReceiverImpl{} + o.OnStateChange(SummationExports_2{ + Receiver: recv, + }) + + t := &Summation_2{ + opts: o, + log: o.Logger, + receiver: recv, + } + + return t, nil +} + +var ( + _ component.Component = (*Summation)(nil) +) + +// Run implements Component. +func (t *Summation_2) Run(ctx context.Context) error { + <-ctx.Done() + return nil +} + +// Update implements Component. +func (t *Summation_2) Update(args component.Arguments) error { + return nil +} diff --git a/internal/runtime/module.alloy b/internal/runtime/module.alloy new file mode 100644 index 0000000000..0efa3839d2 --- /dev/null +++ b/internal/runtime/module.alloy @@ -0,0 +1,12 @@ +declare "config" { + argument "input" {} + + testcomponents.passthrough "pt" { + input = argument.input.value + lag = "1ms" + } + + export "output" { + value = testcomponents.passthrough.pt.output + } +} diff --git a/internal/runtime/source.go b/internal/runtime/source.go index f02e71c6e4..88d779400f 100644 --- a/internal/runtime/source.go +++ b/internal/runtime/source.go @@ -75,7 +75,7 @@ func sourceFromBody(body ast.Body) (*Source, error) { switch fullName { case "declare": declares = append(declares, stmt) - case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git": + case "logging", "tracing", "argument", "export", "import.file", "import.string", "import.http", "import.git", "foreach": configs = append(configs, stmt) default: components = append(components, stmt) diff --git a/internal/runtime/testdata/foreach/foreach_1.txtar b/internal/runtime/testdata/foreach/foreach_1.txtar new file mode 100644 index 0000000000..9c372942a6 --- /dev/null +++ b/internal/runtime/testdata/foreach/foreach_1.txtar @@ -0,0 +1,17 @@ +-- main.alloy -- +foreach "testForeach" { + collection = [1, 2, 3, 4] + //var = "num" + + // Similar to testcomponents.summation, but with a "forward_to" + testcomponents.summation1 "sum" { + //TODO: Use the num variable here + // input = num + input = 1 + forward_to = testcomponents.summation2.final.receiver + } +} + +// Similar to testcomponents.summation, but with a "receiver" export +testcomponents.summation2 "final" { +} \ No newline at end of file