diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 77e781d8aaf..fd402a7fd93 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -7,6 +7,7 @@ package spanstore import ( "context" "fmt" + "sync" "time" "go.uber.org/zap" @@ -21,10 +22,11 @@ import ( ) const ( - spanType = "span" - serviceType = "service" - serviceCacheTTLDefault = 12 * time.Hour - indexCacheTTLDefault = 48 * time.Hour + spanType = "span" + serviceType = "service" + serviceCacheTTLDefault = 12 * time.Hour + indexCacheTTLDefault = 48 * time.Hour + defaultIndexWaitTimeout = 60 * time.Second ) type spanWriterMetrics struct { @@ -42,6 +44,63 @@ type SpanWriter struct { serviceWriter serviceWriter spanConverter dbmodel.FromDomain spanServiceIndex spanAndServiceIndexFn + indexCache sync.Map +} + +func (s *SpanWriter) ensureIndex(ctx context.Context, indexName string) error { + if _, exists := s.indexCache.Load(indexName); exists { + return nil + } + + _, loaded := s.indexCache.LoadOrStore(indexName, struct{}{}) + if loaded { + return nil + } + + exists, err := s.client().IndexExists(indexName).Do(ctx) + if err != nil { + return fmt.Errorf("failed to check index existence: %w", err) + } + + if !exists { + s.logger.Info("Creating index", zap.String("index", indexName)) + + // Set specific settings for the test environment + body := `{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0, + "index.write.wait_for_active_shards": 1 + } + }` + + _, err = s.client().CreateIndex(indexName).Body(body).Do(ctx) + if err != nil { + return fmt.Errorf("failed to create index with settings: %w", err) + } + s.logger.Info("Index created with settings", + zap.String("index", indexName), + zap.String("settings", body)) + } + + // Wait for index to be ready by checking its existence repeatedly + deadline := time.Now().Add(defaultIndexWaitTimeout) + start := time.Now() + for time.Now().Before(deadline) { + exists, err := s.client().IndexExists(indexName).Do(ctx) + if err == nil && exists { + s.logger.Info("Index is ready", + zap.String("index", indexName), + zap.Duration("took", time.Since(start))) + return nil + } + s.logger.Debug("Waiting for index to be ready", + zap.String("index", indexName), + zap.Duration("elapsed", time.Since(start))) + time.Sleep(time.Second) + } + + return fmt.Errorf("timeout waiting for index %s to be ready", indexName) } // SpanWriterParams holds constructor parameters for NewSpanWriter @@ -121,14 +180,49 @@ func getSpanAndServiceIndexFn(p SpanWriterParams) spanAndServiceIndexFn { } // WriteSpan writes a span and its corresponding service:operation in ElasticSearch -func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { +func (s *SpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime) + + // Ensure indices exist before writing + if err := s.ensureIndex(ctx, spanIndexName); err != nil { + return fmt.Errorf("failed to ensure span index: %w", err) + } + if serviceIndexName != "" { + if err := s.ensureIndex(ctx, serviceIndexName); err != nil { + return fmt.Errorf("failed to ensure service index: %w", err) + } + } + jsonSpan := s.spanConverter.FromDomainEmbedProcess(span) if serviceIndexName != "" { s.writeService(serviceIndexName, jsonSpan) } - s.writeSpan(spanIndexName, jsonSpan) - s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName)) + + // Write with retries + var lastErr error + for i := 0; i < 3; i++ { + err := s.writeSpanWithResult(ctx, spanIndexName, jsonSpan) + if err == nil { + return nil + } + lastErr = err + s.logger.Debug("Retrying span write", + zap.String("index", spanIndexName), + zap.Int("attempt", i+1), + zap.Error(lastErr)) + time.Sleep(time.Duration(i+1) * 100 * time.Millisecond) + } + + return fmt.Errorf("failed to write span after retries: %w", lastErr) +} + +func (s *SpanWriter) writeSpanWithResult(_ context.Context, indexName string, jsonSpan *dbmodel.Span) error { + indexService := s.client().Index(). + Index(indexName). + Type(spanType). + BodyJson(jsonSpan) + + indexService.Add() return nil }