Skip to content

Commit

Permalink
chore: remove global scrape context
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Sep 18, 2024
1 parent cfee21b commit 9ab32c3
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 26 deletions.
1 change: 0 additions & 1 deletion api/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ var (
KubernetesClient kubernetes.Interface
KubernetesRestConfig *rest.Config
Namespace string
DefaultContext ScrapeContext

UpstreamConfig upstream.UpstreamConfig
)
Expand Down
6 changes: 2 additions & 4 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
configsv1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/controllers"
"github.com/flanksource/config-db/db"
Expand Down Expand Up @@ -51,13 +50,12 @@ func run(cmd *cobra.Command, args []string) error {
AddShutdownHook(closer)

dutyCtx := dutyContext.NewContext(ctx, commonsCtx.WithTracer(otel.GetTracerProvider().Tracer(otelServiceName)))
api.DefaultContext = api.NewScrapeContext(dutyCtx)

logger := logger.GetLogger("operator")
logger.SetLogLevel(k8sLogLevel)

dedupWindow := api.DefaultContext.Properties().Duration("changes.dedup.window", time.Hour)
if err := db.InitChangeFingerprintCache(api.DefaultContext, dedupWindow); err != nil {
dedupWindow := ctx.Properties().Duration("changes.dedup.window", time.Hour)
if err := db.InitChangeFingerprintCache(ctx, dedupWindow); err != nil {
return fmt.Errorf("failed to initialize change fingerprint cache: %w", err)
}

Expand Down
8 changes: 3 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ var Run = &cobra.Command{
dutyCtx = c
}

api.DefaultContext = api.NewScrapeContext(dutyCtx)

e := echo.New()

e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
c.SetRequest(c.Request().WithContext(api.DefaultContext.Wrap(c.Request().Context())))
c.SetRequest(c.Request().WithContext(dutyCtx.Wrap(c.Request().Context())))
return next(c)
}
})
Expand Down Expand Up @@ -82,8 +80,8 @@ var Run = &cobra.Command{
}

for i := range scraperConfigs {
ctx, cancel, cancelTimeout := api.DefaultContext.WithScrapeConfig(&scraperConfigs[i]).
WithTimeout(api.DefaultContext.Properties().Duration("scraper.timeout", 4*time.Hour))
ctx, cancel, cancelTimeout := api.NewScrapeContext(dutyCtx).WithScrapeConfig(&scraperConfigs[i]).
WithTimeout(dutyCtx.Properties().Duration("scraper.timeout", 4*time.Hour))
defer cancelTimeout()
shutdown.AddHook(cancel)
if err := scrapeAndStore(ctx); err != nil {
Expand Down
14 changes: 6 additions & 8 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/jobs"
Expand Down Expand Up @@ -44,10 +43,9 @@ var Serve = &cobra.Command{
AddShutdownHook(closer)

dutyCtx := dutyContext.NewContext(ctx, commonsCtx.WithTracer(otel.GetTracerProvider().Tracer(otelServiceName)))
api.DefaultContext = api.NewScrapeContext(dutyCtx)

dedupWindow := api.DefaultContext.Properties().Duration("changes.dedup.window", time.Hour)
if err := db.InitChangeFingerprintCache(api.DefaultContext, dedupWindow); err != nil {
dedupWindow := ctx.Properties().Duration("changes.dedup.window", time.Hour)
if err := db.InitChangeFingerprintCache(ctx, dedupWindow); err != nil {
return fmt.Errorf("failed to initialize change fingerprint cache: %w", err)
}

Expand Down Expand Up @@ -101,7 +99,7 @@ func serve(ctx dutyContext.Context, configFiles []string) {
Gatherer: prom.DefaultGatherer,
}))

go startScraperCron(configFiles)
go startScraperCron(ctx, configFiles)

go jobs.ScheduleJobs(ctx)
shutdown.AddHook(jobs.Stop)
Expand All @@ -122,21 +120,21 @@ func serve(ctx dutyContext.Context, configFiles []string) {
}
}

func startScraperCron(configFiles []string) {
func startScraperCron(ctx dutyContext.Context, configFiles []string) {
scraperConfigsFiles, err := v1.ParseConfigs(configFiles...)
if err != nil {
logger.Fatalf("error parsing config files: %v", err)
}

logger.Infof("Persisting %d config files", len(scraperConfigsFiles))
for _, scrapeConfig := range scraperConfigsFiles {
_, err := db.PersistScrapeConfigFromFile(api.DefaultContext, scrapeConfig)
_, err := db.PersistScrapeConfigFromFile(ctx, scrapeConfig)
if err != nil {
logger.Fatalf("Error persisting scrape config to db: %v", err)
}
}

scrapers.SyncScrapeConfigs(api.DefaultContext)
scrapers.SyncScrapeConfigs(ctx)

}

Expand Down
5 changes: 3 additions & 2 deletions controllers/scrapeconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

dutyContext "github.com/flanksource/duty/context"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (r *ScrapeConfigReconciler) Reconcile(c context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

ctx := api.DefaultContext.WithScrapeConfig(scrapeConfig)
ctx := api.NewScrapeContext(dutyContext.NewContext(c)).WithScrapeConfig(scrapeConfig)

// Check if it is deleted, remove scrape config
if !scrapeConfig.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -100,7 +101,7 @@ func (r *ScrapeConfigReconciler) Reconcile(c context.Context, req ctrl.Request)

// Sync jobs if new scrape config is created
if changed {
ctx := api.DefaultContext.WithScrapeConfig(scrapeConfig)
ctx := ctx.WithScrapeConfig(scrapeConfig)
if err := scrapers.SyncScrapeJob(ctx); err != nil {
logger.Error(err, "failed to sync scrape job")
}
Expand Down
3 changes: 2 additions & 1 deletion db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty/context"
"github.com/patrickmn/go-cache"
)

Expand All @@ -15,7 +16,7 @@ func changeFingeprintCacheKey(configID, fingerprint string) string {
return fmt.Sprintf("%s:%s", configID, fingerprint)
}

func InitChangeFingerprintCache(ctx api.ScrapeContext, window time.Duration) error {
func InitChangeFingerprintCache(ctx context.Context, window time.Duration) error {
var changes []*models.ConfigChange
if err := ctx.DB().Where("fingerprint IS NOT NULL").Where("NOW() - created_at <= ?", window).Find(&changes).Error; err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion db/config_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func GetScrapeConfigsOfAgent(ctx context.Context, agentID uuid.UUID) ([]models.C
return configScrapers, err
}

func PersistScrapeConfigFromFile(ctx api.ScrapeContext, scrapeConfig v1.ScrapeConfig) (models.ConfigScraper, error) {
func PersistScrapeConfigFromFile(ctx context.Context, scrapeConfig v1.ScrapeConfig) (models.ConfigScraper, error) {
configScraper, err := scrapeConfig.ToModel()
if err != nil {
return configScraper, err
Expand Down
8 changes: 5 additions & 3 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/google/uuid"
"github.com/robfig/cron/v3"
Expand Down Expand Up @@ -41,7 +42,7 @@ var (
ScraperConcurrency = 12
)

func SyncScrapeConfigs(sc api.ScrapeContext) {
func SyncScrapeConfigs(sc context.Context) {
if globalScraperSempahore == nil {
globalScraperSempahore = semaphore.NewWeighted(int64(sc.Properties().Int("scraper.concurrency", ScraperConcurrency)))
}
Expand All @@ -67,7 +68,7 @@ func SyncScrapeConfigs(sc api.ScrapeContext) {
DefaultSchedule = sc.Properties().String("scrapers.default.schedule", DefaultSchedule)
j := &job.Job{
Name: "ConfigScraperSync",
Context: sc.DutyContext(),
Context: sc,
Schedule: "@every 10m",
Singleton: true,
JobHistory: true,
Expand All @@ -86,7 +87,8 @@ func SyncScrapeConfigs(sc api.ScrapeContext) {
continue
}

if err := SyncScrapeJob(sc.WithScrapeConfig(&_scraper)); err != nil {
srapeCtx := api.NewScrapeContext(sc).WithScrapeConfig(&_scraper)
if err := SyncScrapeJob(srapeCtx); err != nil {
jr.History.AddErrorf("Error syncing scrape job[%s]: %v", scraper.ID, err)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion scrapers/runscrapers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ var _ = Describe("Scrapers test", Ordered, func() {

It("should create a new config item", func() {
config := getConfigSpec("file-car")
_, err := db.PersistScrapeConfigFromFile(ctx, config)
_, err := db.PersistScrapeConfigFromFile(ctx.DutyContext(), config)
Expect(err).To(BeNil())

ctx := api.NewScrapeContext(DefaultContext).WithScrapeConfig(&config)
Expand Down

0 comments on commit 9ab32c3

Please sign in to comment.