From 0d9442816372866c2410abf0d6b58f0ca5317491 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 18 Sep 2024 10:00:01 +0545 Subject: [PATCH] fix: duty context middleware and add span for deleting stale configs --- cmd/operator.go | 2 +- cmd/server.go | 16 +++++++--------- db/config_scraper.go | 2 +- scrapers/run.go | 11 +++++++++++ scrapers/run_now.go | 10 +++++++--- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/cmd/operator.go b/cmd/operator.go index 0206378e..09f4e108 100644 --- a/cmd/operator.go +++ b/cmd/operator.go @@ -69,7 +69,7 @@ func run(cmd *cobra.Command, args []string) error { utilruntime.Must(configsv1.AddToScheme(scheme)) // Start the server - go serve(args) + go serve(dutyCtx, args) mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, diff --git a/cmd/server.go b/cmd/server.go index 6879048b..5cb995ad 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -51,25 +51,24 @@ var Serve = &cobra.Command{ return fmt.Errorf("failed to initialize change fingerprint cache: %w", err) } - serve(args) + serve(dutyCtx, args) return nil }, } -func serve(configFiles []string) { +func serve(ctx dutyContext.Context, configFiles []string) { e := echo.New() + dutyEcho.AddDebugHandlers(ctx, e, func(next echo.HandlerFunc) echo.HandlerFunc { return next }) + e.Use(otelecho.Middleware("config-db", otelecho.WithSkipper(telemetryURLSkipper))) + e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { - ctx := api.DefaultContext.Wrap(c.Request().Context()) - c.SetRequest(c.Request().WithContext(ctx)) + c.SetRequest(c.Request().WithContext(ctx.Wrap(c.Request().Context()))) return next(c) } }) - dutyEcho.AddDebugHandlers(api.DefaultContext.DutyContext(), e, func(next echo.HandlerFunc) echo.HandlerFunc { return next }) - e.Use(otelecho.Middleware("config-db", otelecho.WithSkipper(telemetryURLSkipper))) - if logger.IsTraceEnabled() { echoLogConfig := middleware.DefaultLoggerConfig echoLogConfig.Skipper = telemetryURLSkipper @@ -104,7 +103,7 @@ func serve(configFiles []string) { go startScraperCron(configFiles) - go jobs.ScheduleJobs(api.DefaultContext.DutyContext()) + go jobs.ScheduleJobs(ctx) shutdown.AddHook(jobs.Stop) shutdown.AddHook(func() { @@ -121,7 +120,6 @@ func serve(configFiles []string) { if err := e.Start(fmt.Sprintf(":%d", httpPort)); err != nil && err != http.ErrServerClosed { e.Logger.Fatal(err) } - } func startScraperCron(configFiles []string) { diff --git a/db/config_scraper.go b/db/config_scraper.go index 7791abc1..bdb1172d 100644 --- a/db/config_scraper.go +++ b/db/config_scraper.go @@ -15,7 +15,7 @@ import ( "gorm.io/gorm" ) -func FindScraper(ctx api.ScrapeContext, id string) (*models.ConfigScraper, error) { +func FindScraper(ctx context.Context, id string) (*models.ConfigScraper, error) { var configScraper models.ConfigScraper if err := ctx.DB().Where("id = ?", id).First(&configScraper).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { diff --git a/scrapers/run.go b/scrapers/run.go index 2a6bf8e0..ae48f6b1 100644 --- a/scrapers/run.go +++ b/scrapers/run.go @@ -8,6 +8,7 @@ import ( "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/config-db/db" + "go.opentelemetry.io/otel/attribute" ) type contextKey string @@ -56,8 +57,18 @@ func RunScraper(ctx api.ScrapeContext) (*ScrapeOutput, error) { } func UpdateStaleConfigItems(ctx api.ScrapeContext, results v1.ScrapeResults) error { + basectx, span := ctx.StartSpan("UpdateStaleConfigItems") + defer span.End() + + ctx.Context = basectx + persistedID := ctx.ScrapeConfig().GetPersistedID() if persistedID != nil { + ctx.GetSpan().SetAttributes( + attribute.Int("scrape.results", len(results)), + attribute.Bool("scrape.hasError", v1.ScrapeResults(results).HasErr()), + ) + // If error in any of the scrape results, don't delete old items if len(results) > 0 && !v1.ScrapeResults(results).HasErr() { if err := DeleteStaleConfigItems(ctx, *persistedID); err != nil { diff --git a/scrapers/run_now.go b/scrapers/run_now.go index 8ac06db0..7800cbab 100644 --- a/scrapers/run_now.go +++ b/scrapers/run_now.go @@ -7,13 +7,17 @@ import ( "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/config-db/db" + "github.com/flanksource/duty/context" "github.com/labstack/echo/v4" ) func RunNowHandler(c echo.Context) error { id := c.Param("id") - scraper, err := db.FindScraper(api.DefaultContext, id) + baseCtx := c.Request().Context() + ctx := baseCtx.(context.Context) + + scraper, err := db.FindScraper(ctx, id) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, err.Error()) // could mean server errors as well, but there's no trivial way to find out... } @@ -27,8 +31,8 @@ func RunNowHandler(c echo.Context) error { return echo.NewHTTPError(http.StatusInternalServerError, "failed to transform config scraper model", err) } - ctx := api.DefaultContext.WithScrapeConfig(&configScraper) - j := newScraperJob(ctx) + scrapeCtx := api.NewScrapeContext(ctx).WithScrapeConfig(&configScraper) + j := newScraperJob(scrapeCtx) j.Run() return c.JSON(http.StatusOK, j.LastJob.Details)