Skip to content

Commit

Permalink
Fix Build Errors And Naming
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Nov 3, 2024
1 parent 89c7c70 commit c6b6274
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 163 deletions.
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error {
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, es.PrimaryNamespace, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, es.PrimaryNamespace, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runQueryService(t *testing.T, esURL string) *Server {
flagsSvc := flags.NewService(ports.QueryAdminHTTP)
flagsSvc.Logger = zaptest.NewLogger(t)

f := es.NewFactory()
f := es.NewFactory(es.PrimaryNamespace)
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags([]string{
"--es.tls.enabled=false",
Expand Down
30 changes: 15 additions & 15 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,24 +118,24 @@ func (f *Factory) configureFromOptions(o *Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.config, logger, metricsFactory)
client, err := f.newClientFn(f.config, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
return fmt.Errorf("failed to create Elasticsearch client: %w", err)
}
f.client.Store(&primaryClient)
f.client.Store(&client)

if f.config.Authentication.BasicAuthentication.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
watcher, err := fswatcher.New([]string{f.config.Authentication.BasicAuthentication.PasswordFilePath}, f.onPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
return fmt.Errorf("failed to create watcher for ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)
f.watchers = append(f.watchers, watcher)
}

return nil
}

func (f *Factory) getPrimaryClient() es.Client {
func (f *Factory) getClient() es.Client {
if c := f.client.Load(); c != nil {
return *c
}
Expand All @@ -144,17 +144,17 @@ func (f *Factory) getPrimaryClient() es.Client {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.getPrimaryClient, f.config, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getClient, f.config, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.config, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getClient, f.config, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.getPrimaryClient, f.config, f.logger)
return createDependencyReader(f.getClient, f.config, f.logger)
}

func createSpanReader(
Expand Down Expand Up @@ -232,7 +232,7 @@ func createSpanWriter(

func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
params := esSampleStore.Params{
Client: f.getPrimaryClient,
Client: f.getClient,
Logger: f.logger,
IndexPrefix: f.config.Indices.IndexPrefix,
IndexDateLayout: f.config.Indices.Sampling.DateLayout,
Expand All @@ -248,7 +248,7 @@ func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store
if err != nil {
return nil, err
}
if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil {
if _, err := f.getClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil {
return nil, fmt.Errorf("failed to create template: %w", err)
}
}
Expand Down Expand Up @@ -290,12 +290,12 @@ func (f *Factory) Close() error {
for _, w := range f.watchers {
errs = append(errs, w.Close())
}
errs = append(errs, f.getPrimaryClient().Close())
errs = append(errs, f.getClient().Close())

return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
func (f *Factory) onPasswordChange() {
f.onClientPasswordChange(f.config, &f.client)
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom
}

func (f *Factory) Purge(ctx context.Context) error {
esClient := f.getPrimaryClient()
esClient := f.getClient()
_, err := esClient.DeleteIndex("*").Do(ctx)
return err
}
Expand Down
124 changes: 20 additions & 104 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,14 @@ func (m *mockClientBuilder) NewClient(*escfg.Configuration, *zap.Logger, metrics
}

func TestElasticsearchFactory(t *testing.T) {
f := NewFactory()
f := NewFactory(PrimaryNamespace)
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{})
f.InitFromViper(v, zap.NewNop())

f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error")}).NewClient
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create primary Elasticsearch client: made-up error")

f.archiveConfig.Enabled = true
f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
// to test archive storage error, pretend that primary client creation is successful
// but override newClientFn so it fails for the next invocation
f.newClientFn = (&mockClientBuilder{err: errors.New("made-up error2")}).NewClient
return (&mockClientBuilder{}).NewClient(c, logger, metricsFactory)
}
require.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "failed to create archive Elasticsearch client: made-up error2")

f.newClientFn = (&mockClientBuilder{}).NewClient
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

Expand All @@ -90,26 +81,19 @@ func TestElasticsearchFactory(t *testing.T) {
_, err = f.CreateDependencyReader()
require.NoError(t, err)

_, err = f.CreateArchiveSpanReader()
require.NoError(t, err)

_, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)

_, err = f.CreateSamplingStore(1)
require.NoError(t, err)

require.NoError(t, f.Close())
}

func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{
f := NewFactory(PrimaryNamespace)
f.config = &escfg.Configuration{
Tags: escfg.TagsAsFields{
File: "fixtures/file-does-not-exist.txt",
},
}
f.archiveConfig = &escfg.Configuration{}
f.newClientFn = (&mockClientBuilder{}).NewClient
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
defer f.Close()
Expand All @@ -119,11 +103,10 @@ func TestElasticsearchTagsFileDoNotExist(t *testing.T) {
}

func TestElasticsearchILMUsedWithoutReadWriteAliases(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{
f := NewFactory(ArchiveNamespace)
f.config = &escfg.Configuration{
UseILM: true,
}
f.archiveConfig = &escfg.Configuration{}
f.newClientFn = (&mockClientBuilder{}).NewClient
require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
defer f.Close()
Expand Down Expand Up @@ -194,9 +177,8 @@ func TestTagKeysAsFields(t *testing.T) {
}

func TestCreateTemplateError(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{CreateIndexTemplates: true}
f.archiveConfig = &escfg.Configuration{}
f := NewFactory(PrimaryNamespace)
f.config = &escfg.Configuration{CreateIndexTemplates: true}
f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
Expand All @@ -212,9 +194,8 @@ func TestCreateTemplateError(t *testing.T) {
}

func TestILMDisableTemplateCreation(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true}
f.archiveConfig = &escfg.Configuration{}
f := NewFactory(PrimaryNamespace)
f.config = &escfg.Configuration{UseILM: true, UseReadWriteAliases: true, CreateIndexTemplates: true}
f.newClientFn = (&mockClientBuilder{createTemplateError: errors.New("template-error")}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
defer f.Close()
Expand All @@ -223,43 +204,13 @@ func TestILMDisableTemplateCreation(t *testing.T) {
require.NoError(t, err) // as the createTemplate is not called, CreateSpanWriter should not return an error
}

func TestArchiveDisabled(t *testing.T) {
f := NewFactory()
f.archiveConfig = &escfg.Configuration{Enabled: false}
f.newClientFn = (&mockClientBuilder{}).NewClient
w, err := f.CreateArchiveSpanWriter()
assert.Nil(t, w)
require.NoError(t, err)
r, err := f.CreateArchiveSpanReader()
assert.Nil(t, r)
require.NoError(t, err)
}

func TestArchiveEnabled(t *testing.T) {
f := NewFactory()
f.primaryConfig = &escfg.Configuration{}
f.archiveConfig = &escfg.Configuration{Enabled: true}
f.newClientFn = (&mockClientBuilder{}).NewClient
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close() // Ensure resources are cleaned up if initialization is successful
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
r, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestConfigureFromOptions(t *testing.T) {
f := NewFactory()
f := NewFactory(ArchiveNamespace)
o := &Options{
Primary: namespaceConfig{Configuration: escfg.Configuration{Servers: []string{"server"}}},
others: map[string]*namespaceConfig{"es-archive": {Configuration: escfg.Configuration{Servers: []string{"server2"}}}},
}
f.configureFromOptions(o)
assert.Equal(t, o.GetPrimary(), f.primaryConfig)
assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig)
assert.Equal(t, o.GetPrimary(), f.config)
}

func TestESStorageFactoryWithConfig(t *testing.T) {
Expand Down Expand Up @@ -324,13 +275,8 @@ func TestESStorageFactoryWithConfigError(t *testing.T) {
func TestPasswordFromFile(t *testing.T) {
defer testutils.VerifyGoLeaksOnce(t)
t.Run("primary client", func(t *testing.T) {
f := NewFactory()
testPasswordFromFile(t, f, f.getPrimaryClient, f.CreateSpanWriter)
})

t.Run("archive client", func(t *testing.T) {
f2 := NewFactory()
testPasswordFromFile(t, f2, f2.getArchiveClient, f2.CreateArchiveSpanWriter)
f := NewFactory(PrimaryNamespace)
testPasswordFromFile(t, f, f.getClient, f.CreateSpanWriter)
})

t.Run("load token error", func(t *testing.T) {
Expand Down Expand Up @@ -370,21 +316,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client,
pwdFile := filepath.Join(t.TempDir(), "pwd")
require.NoError(t, os.WriteFile(pwdFile, []byte(pwd1), 0o600))

f.primaryConfig = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
Authentication: escfg.Authentication{
BasicAuthentication: escfg.BasicAuthentication{
Username: "user",
PasswordFilePath: pwdFile,
},
},
BulkProcessing: escfg.BulkProcessing{
MaxBytes: -1, // disable bulk; we want immediate flush
},
}
f.archiveConfig = &escfg.Configuration{
Enabled: true,
f.config = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
Authentication: escfg.Authentication{
Expand Down Expand Up @@ -443,8 +375,7 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client,

func TestFactoryESClientsAreNil(t *testing.T) {
f := &Factory{}
assert.Nil(t, f.getPrimaryClient())
assert.Nil(t, f.getArchiveClient())
assert.Nil(t, f.getClient())
}

func TestPasswordFromFileErrors(t *testing.T) {
Expand All @@ -457,17 +388,8 @@ func TestPasswordFromFileErrors(t *testing.T) {
pwdFile := filepath.Join(t.TempDir(), "pwd")
require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600))

f := NewFactory()
f.primaryConfig = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
Authentication: escfg.Authentication{
BasicAuthentication: escfg.BasicAuthentication{
PasswordFilePath: pwdFile,
},
},
}
f.archiveConfig = &escfg.Configuration{
f := NewFactory(PrimaryNamespace)
f.config = &escfg.Configuration{
Servers: []string{server.URL},
LogLevel: "debug",
Authentication: escfg.Authentication{
Expand All @@ -481,16 +403,10 @@ func TestPasswordFromFileErrors(t *testing.T) {
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
defer f.Close()

f.primaryConfig.Servers = []string{}
f.onPrimaryPasswordChange()
assert.Contains(t, buf.String(), "no servers specified")

f.archiveConfig.Servers = []string{}
buf.Reset()
f.onArchivePasswordChange()
f.config.Servers = []string{}
f.onPasswordChange()
assert.Contains(t, buf.String(), "no servers specified")

require.NoError(t, os.Remove(pwdFile))
f.onPrimaryPasswordChange()
f.onArchivePasswordChange()
f.onPasswordChange()
}
Loading

0 comments on commit c6b6274

Please sign in to comment.