diff --git a/cmd/root.go b/cmd/root.go index 9f0d8c0b..657f0c0a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -78,6 +78,7 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", "", "upstream username") flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", "", "upstream password") flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", "", "name of this agent") + flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", 500, "upstream reconciliation page size") } func init() { diff --git a/jobs/jobs.go b/jobs/jobs.go index d8ae2a7c..bb1e48db 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -32,17 +32,10 @@ func ScheduleJobs() { pullJob := &UpstreamPullJob{} pullJob.Run() - pushJob := &UpstreamPushJob{} - pushJob.Run() - if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil { logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err) } - if _, err := FuncScheduler.AddJob(PushConfigResultsToUpstreamSchedule, pushJob); err != nil { - logger.Fatalf("Failed to schedule job [UpstreamPushJob]: %v", err) - } - scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults) } diff --git a/jobs/sync_upstream.go b/jobs/sync_upstream.go index dc6f00cf..e3968b52 100644 --- a/jobs/sync_upstream.go +++ b/jobs/sync_upstream.go @@ -3,56 +3,49 @@ package jobs import ( "encoding/json" "fmt" + "io" "net/http" "net/url" + "time" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" "github.com/flanksource/config-db/db" "github.com/flanksource/duty/models" "github.com/flanksource/duty/upstream" - "github.com/google/uuid" "gorm.io/gorm/clause" ) -var tablesToReconcile = []string{ - "config_items", - "config_changes", - "config_analysis", -} +var ReconcilePageSize int // ReconcileConfigScraperResults pushes missing scrape config results to the upstream server func ReconcileConfigScraperResults() { ctx := api.DefaultContext - jobHistory := models.NewJobHistory("PushScraperConfigResultsToUpstream", "Config", "") + jobHistory := models.NewJobHistory("PushUpstream", "Config", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, 500) - for _, table := range tablesToReconcile { - if err := reconciler.Sync(ctx, table); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("failed to sync table %s: %v", table, err) - } else { - jobHistory.IncrSuccess() - } + reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize) + if err := reconciler.Sync(ctx, "config_items"); err != nil { + jobHistory.AddError(err.Error()) + logger.Errorf("failed to sync table config_items: %v", err) + } else { + jobHistory.IncrSuccess() } } // UpstreamPullJob pulls scrape configs from the upstream server type UpstreamPullJob struct { - lastFetchedID uuid.UUID + lastRuntime time.Time } func (t *UpstreamPullJob) Run() { - ctx := api.DefaultContext - - jobHistory := models.NewJobHistory("PullUpstreamScrapeConfigs", "Config", "") + jobHistory := models.NewJobHistory("PullUpstream", "Config", "") _ = db.PersistJobHistory(jobHistory.Start()) defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - if err := t.pull(ctx, api.UpstreamConfig); err != nil { + if err := t.pull(api.DefaultContext, api.UpstreamConfig); err != nil { jobHistory.AddError(err.Error()) logger.Errorf("error pulling scrape configs from upstream: %v", err) } else { @@ -61,7 +54,7 @@ func (t *UpstreamPullJob) Run() { } func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { - logger.Tracef("pulling scrape configs from upstream since: %v", t.lastFetchedID) + logger.Tracef("pulling scrape configs from upstream since: %v", t.lastRuntime) endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName) if err != nil { @@ -76,7 +69,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo req.SetBasicAuth(config.Username, config.Password) params := url.Values{} - params.Add("since", t.lastFetchedID.String()) + params.Add("since", t.lastRuntime.Format(time.RFC3339)) req.URL.RawQuery = params.Encode() httpClient := &http.Client{} @@ -86,6 +79,11 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("server returned unexpected status:%s (%s)", resp.Status, body) + } + var scrapeConfigs []models.ConfigScraper if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil { return fmt.Errorf("error decoding JSON response: %w", err) @@ -95,7 +93,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo return nil } - t.lastFetchedID = scrapeConfigs[len(scrapeConfigs)-1].ID + t.lastRuntime = scrapeConfigs[len(scrapeConfigs)-1].UpdatedAt logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs)) @@ -104,114 +102,3 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo UpdateAll: true, }).Create(&scrapeConfigs).Error } - -type LastPushedConfigResult struct { - ConfigID uuid.UUID - AnalysisID uuid.UUID - ChangeID uuid.UUID -} - -// UpstreamPushJob pushes scrape config results to the upstream server -type UpstreamPushJob struct { - status LastPushedConfigResult - - initiated bool -} - -// init initializes the last pushed ids ... -func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error { - endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName) - if err != nil { - return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err) - } - - req, err := http.NewRequest(http.MethodGet, endpoint, nil) - if err != nil { - return fmt.Errorf("error creating new http request: %w", err) - } - - req.SetBasicAuth(config.Username, config.Password) - - httpClient := &http.Client{} - resp, err := httpClient.Do(req) - if err != nil { - return fmt.Errorf("error making request: %w", err) - } - defer resp.Body.Close() - - if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil { - return fmt.Errorf("error decoding JSON response: %w", err) - } - - return nil -} - -func (t *UpstreamPushJob) Run() { - ctx := api.DefaultContext - - jobHistory := models.NewJobHistory("UpstreamPushJob", "Config", "") - _ = db.PersistJobHistory(jobHistory.Start()) - defer func() { _ = db.PersistJobHistory(jobHistory.End()) }() - - if !t.initiated { - logger.Debugf("initializing upstream push job") - if err := t.init(ctx, api.UpstreamConfig); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("error initializing upstream push job: %v", err) - return - } - - t.initiated = true - } - - if err := t.run(ctx); err != nil { - jobHistory.AddError(err.Error()) - logger.Errorf("error pushing to upstream: %v", err) - } else { - jobHistory.IncrSuccess() - } -} - -func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error { - pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName} - if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil { - return err - } - - if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil { - return err - } - - if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil { - return err - } - - if pushData.Count() == 0 { - return nil - } - - logger.Tracef("pushing %d config scrape results to upstream", pushData.Count()) - if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil { - return fmt.Errorf("error pushing to upstream: %w", err) - } - - if len(pushData.ConfigItems) > 0 { - t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID - } - - if len(pushData.ConfigAnalysis) > 0 { - t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID - } - - if len(pushData.ConfigChanges) > 0 { - id := pushData.ConfigChanges[len(pushData.ConfigChanges)-1].ID - parsed, err := uuid.Parse(id) - if err != nil { - return err - } - - t.status.ChangeID = parsed - } - - return nil -}