Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat: global init state prevents multiple instances of filebeat receiver #41475

Open
mauri870 opened this issue Oct 29, 2024 · 1 comment
Open
Assignees
Labels
bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Comments

@mauri870
Copy link
Member

mauri870 commented Oct 29, 2024

While looking into testcases for the new beats receivers I encountered an issue that happens when creating multiple filebeat receivers via a factory:

panic: plugin parse_aws_vpc_flow_log registration fail parse_aws_vpc_flow_log exists already [recovered]
	panic: plugin parse_aws_vpc_flow_log registration fail parse_aws_vpc_flow_log exists already

Here is a test that reproduces this issue:

TestInstantiateReceiverFromFactory
func TestReceiverFactory(t *testing.T) {
	factory := NewFactory()

	cfg := Config{
		Beatconfig: map[string]interface{}{
			"filebeat": map[string]interface{}{
				"inputs": []map[string]interface{}{
					{
						"type":    "benchmark",
						"enabled": true,
						"message": "test",
						"count":   1,
					},
				},
			},
			"output": map[string]interface{}{
				"otelconsumer": map[string]interface{}{},
			},
			"logging": map[string]interface{}{
				"level": "debug",
				"selectors": []string{
					"*",
				},
			},
			"path.home": t.TempDir(),
		},
	}

	for i := 0; i < 5; i++ {
                // For collector packages < v0.112.0 this is `factory.CreateLogsReceiver`
		receiver, err := factory.CreateLogs(nil, receivertest.NewNopSettings(), &cfg, nil)
		assert.NoError(t, err)
		assert.NotNil(t, receiver)
		assert.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost()))
		assert.NoError(t, receiver.Shutdown(context.Background()))
	}
}

The test output:

=== RUN   TestReceiverFactory
--- FAIL: TestReceiverFactory (0.02s)
panic: plugin parse_aws_vpc_flow_log registration fail parse_aws_vpc_flow_log exists already [recovered]
	panic: plugin parse_aws_vpc_flow_log registration fail parse_aws_vpc_flow_log exists already

goroutine 67 [running]:
testing.tRunner.func1.2({0x73e27c0, 0xc0020898e0})
	/home/mauri870/git/go/src/testing/testing.go:1706 +0x21c
testing.tRunner.func1()
	/home/mauri870/git/go/src/testing/testing.go:1709 +0x35e
panic({0x73e27c0?, 0xc0020898e0?})
	/home/mauri870/git/go/src/runtime/panic.go:787 +0x132
github.com/elastic/beats/v7/libbeat/processors.RegisterPlugin({0x7c5b471, 0x16}, 0x7e991e8)
	/home/mauri870/git/elastic/beats/libbeat/processors/registry.go:60 +0x125
github.com/elastic/beats/v7/x-pack/filebeat/processors/aws_vpcflow.InitializeModule()
	/home/mauri870/git/elastic/beats/x-pack/filebeat/processors/aws_vpcflow/parse_aws_vpc_flow_log.go:29 +0x26
github.com/elastic/beats/v7/x-pack/filebeat/include.InitializeModule()
	/home/mauri870/git/elastic/beats/x-pack/filebeat/include/list.go:64 +0x18
github.com/elastic/beats/v7/libbeat/cmd/instance.NewBeat({0x7c184d7, 0x8}, {0x0, 0x0}, {0x0, 0x0}, 0x1, {0xc0010055c0, 0x4, 0x6})
	/home/mauri870/git/elastic/beats/libbeat/cmd/instance/beat.go:245 +0x72
github.com/elastic/beats/v7/libbeat/cmd/instance.NewBeatReceiver({{0x7c184d7, 0x8}, {0x0, 0x0}, {0x0, 0x0}, 0x1, 0x1, {{0x0, 0x0}, ...}, ...}, ...)
	/home/mauri870/git/elastic/beats/libbeat/cmd/instance/beat.go:292 +0xbe
github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.createReceiver({0xc0011aff20?, 0x7e982b0?}, {{{{0x7c0f18c, 0x3}}, {0xc001315320, 0x24}}, {0xc000aefe80, {0x84d9f30, 0xc0011aff00}, {0x84d9f08, ...}, ...}, ...}, ...)
	/home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/factory.go:45 +0x50e
go.opentelemetry.io/collector/receiver.CreateLogsFunc.CreateLogs(...)
	/home/mauri870/go/pkg/mod/go.opentelemetry.io/collector/[email protected]/receiver.go:133
github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver.TestReceiverFactory(0xc001302fc0)
	/home/mauri870/git/elastic/beats/x-pack/filebeat/fbreceiver/receiver_test.go:289 +0x6d9
testing.tRunner(0xc001302fc0, 0x7e97870)
	/home/mauri870/git/go/src/testing/testing.go:1764 +0xf4
created by testing.(*T).Run in goroutine 1
	/home/mauri870/git/go/src/testing/testing.go:1823 +0x409
FAIL	github.com/elastic/beats/v7/x-pack/filebeat/fbreceiver	0.052s

According to @leehinman this is likely a global state issue in libbeat

func init() {
p.MustRegisterLoader(pluginKey, func(ifc interface{}) error {
p, ok := ifc.(processorPlugin)
if !ok {
return errors.New("plugin does not match processor plugin type")
}
return registry.Register(p.name, p.constr)
})
}
type Constructor func(config *config.C) (beat.Processor, error)
var registry = NewNamespace()

Related to #40110.

@mauri870 mauri870 added bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team labels Oct 29, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

No branches or pull requests

2 participants