From f7034481999e307875bb3c1d1ae28d8ae5b9a48f Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 26 Sep 2023 21:15:08 +0545 Subject: [PATCH] feat: get the last pushed ids from the upstream server --- api/global.go | 3 --- cmd/server.go | 14 ++--------- jobs/jobs.go | 3 +-- jobs/sync_upstream.go | 58 ++++++++++++++++++++++++------------------- 4 files changed, 36 insertions(+), 42 deletions(-) diff --git a/api/global.go b/api/global.go index 40207a28..5493aad3 100644 --- a/api/global.go +++ b/api/global.go @@ -3,7 +3,6 @@ package api import ( v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/duty/upstream" - "github.com/google/uuid" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -14,8 +13,6 @@ var ( Namespace string DefaultContext ScrapeContext - // the derived agent id from the agentName - AgentID uuid.UUID UpstreamConfig upstream.UpstreamConfig ) diff --git a/cmd/server.go b/cmd/server.go index 4d352fcf..1bf6af90 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -11,6 +11,7 @@ import ( "github.com/flanksource/config-db/db" "github.com/flanksource/config-db/jobs" "github.com/flanksource/config-db/query" + "github.com/google/uuid" "github.com/flanksource/config-db/scrapers" "github.com/labstack/echo/v4" @@ -55,17 +56,6 @@ func serve(configFiles []string) { e.GET("/query", query.Handler) e.POST("/run/:id", scrapers.RunNowHandler) - if api.UpstreamConfig.AgentName != "" { - agent, err := db.FindAgentByName(context.Background(), api.UpstreamConfig.AgentName) - if err != nil { - logger.Fatalf("error searching for agent (name=%s): %v", api.UpstreamConfig.AgentName, err) - } else if agent == nil { - logger.Fatalf("agent not found (name=%s)", api.UpstreamConfig.AgentName) - } else { - api.AgentID = agent.ID - } - } - go startScraperCron(configFiles) go jobs.ScheduleJobs() @@ -89,7 +79,7 @@ func startScraperCron(configFiles []string) { } } - scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(api.AgentID) + scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(uuid.Nil) if err != nil { logger.Fatalf("error getting configs from database: %v", err) } diff --git a/jobs/jobs.go b/jobs/jobs.go index c08a4359..d8ae2a7c 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -3,7 +3,6 @@ package jobs import ( "reflect" "runtime" - "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -33,7 +32,7 @@ func ScheduleJobs() { pullJob := &UpstreamPullJob{} pullJob.Run() - pushJob := &UpstreamPushJob{MaxAge: time.Minute * 5} + pushJob := &UpstreamPushJob{} pushJob.Run() if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index ae6b0aac..dc6f00cf 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/url" - "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" @@ -106,31 +105,42 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo }).Create(&scrapeConfigs).Error } +type LastPushedConfigResult struct { + ConfigID uuid.UUID + AnalysisID uuid.UUID + ChangeID uuid.UUID +} + // UpstreamPushJob pushes scrape config results to the upstream server type UpstreamPushJob struct { - lastConfigItemID uuid.UUID - lastAnalysisID uuid.UUID - lastChangeID uuid.UUID + status LastPushedConfigResult initiated bool - - // MaxAge defines how far back we look into the past on startup when - // lastRuntime is zero. - MaxAge time.Duration } // init initializes the last pushed ids ... -func (t *UpstreamPushJob) init(ctx api.ScrapeContext) error { - if err := ctx.DB().Debug().Model(&models.ConfigItem{}).Select("id").Where("NOW() - updated_at <= ?", t.MaxAge).Scan(&t.lastConfigItemID).Error; err != nil { - return fmt.Errorf("error getting last config item id: %w", err) +func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { + endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName) + if err != nil { + return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) } - if err := ctx.DB().Debug().Model(&models.ConfigAnalysis{}).Select("id").Where("NOW() - first_observed <= ?", t.MaxAge).Scan(&t.lastAnalysisID).Error; err != nil { - return fmt.Errorf("error getting last analysis id: %w", err) + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return fmt.Errorf("error creating new http request: %w", err) } - if err := ctx.DB().Debug().Model(&models.ConfigChange{}).Select("id").Where("NOW() - created_at <= ?", t.MaxAge).Scan(&t.lastChangeID).Error; err != nil { - return fmt.Errorf("error getting last change id: %w", err) + req.SetBasicAuth(config.Username, config.Password) + + httpClient := &http.Client{} + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil { + return fmt.Errorf("error decoding JSON response: %w", err) } return nil @@ -145,7 +155,7 @@ func (t *UpstreamPushJob) Run() { if !t.initiated { logger.Debugf("initializing upstream push job") - if err := t.init(ctx); err != nil { + if err := t.init(ctx, api.UpstreamConfig); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("error initializing upstream push job: %v", err) return @@ -163,36 +173,34 @@ func (t *UpstreamPushJob) Run() { } func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { - logger.Tracef("running configs upstream push job") - pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName} - if err := ctx.DB().Where("id > ?", t.lastConfigItemID).Find(&pushData.ConfigItems).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil { return err } - if err := ctx.DB().Where("id > ?", t.lastAnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { return err } - if err := ctx.DB().Where("id > ?", t.lastChangeID).Find(&pushData.ConfigChanges).Error; err != nil { + if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil { return err } - logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) if pushData.Count() == 0 { return nil } + logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil { return fmt.Errorf("error pushing to upstream: %w", err) } if len(pushData.ConfigItems) > 0 { - t.lastConfigItemID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID + t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID } if len(pushData.ConfigAnalysis) > 0 { - t.lastAnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID + t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID } if len(pushData.ConfigChanges) > 0 { @@ -202,7 +210,7 @@ func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { return err } - t.lastChangeID = parsed + t.status.ChangeID = parsed } return nil