Skip to content

Commit

Permalink
chore: remove push job (will need to add queue consumer)
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
adityathebe committed Sep 28, 2023
1 parent 494cfbe commit b8260ac
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 141 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", "", "upstream username")
flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", "", "upstream password")
flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", "", "name of this agent")
flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", 500, "upstream reconciliation page size")
}

func init() {
Expand Down
7 changes: 0 additions & 7 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,10 @@ func ScheduleJobs() {
pullJob := &UpstreamPullJob{}
pullJob.Run()

pushJob := &UpstreamPushJob{}
pushJob.Run()

if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil {
logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err)
}

if _, err := FuncScheduler.AddJob(PushConfigResultsToUpstreamSchedule, pushJob); err != nil {
logger.Fatalf("Failed to schedule job [UpstreamPushJob]: %v", err)
}

scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults)
}

Expand Down
155 changes: 21 additions & 134 deletions jobs/sync_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,49 @@ package jobs
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/upstream"
"github.com/google/uuid"
"gorm.io/gorm/clause"
)

var tablesToReconcile = []string{
"config_items",
"config_changes",
"config_analysis",
}
var ReconcilePageSize int

// ReconcileConfigScraperResults pushes missing scrape config results to the upstream server
func ReconcileConfigScraperResults() {
ctx := api.DefaultContext

jobHistory := models.NewJobHistory("PushScraperConfigResultsToUpstream", "Config", "")
jobHistory := models.NewJobHistory("PushUpstream", "Config", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, 500)
for _, table := range tablesToReconcile {
if err := reconciler.Sync(ctx, table); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("failed to sync table %s: %v", table, err)
} else {
jobHistory.IncrSuccess()
}
reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize)
if err := reconciler.Sync(ctx, "config_items"); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("failed to sync table config_items: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

// UpstreamPullJob pulls scrape configs from the upstream server
type UpstreamPullJob struct {
lastFetchedID uuid.UUID
lastRuntime time.Time
}

func (t *UpstreamPullJob) Run() {
ctx := api.DefaultContext

jobHistory := models.NewJobHistory("PullUpstreamScrapeConfigs", "Config", "")
jobHistory := models.NewJobHistory("PullUpstream", "Config", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := t.pull(ctx, api.UpstreamConfig); err != nil {
if err := t.pull(api.DefaultContext, api.UpstreamConfig); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pulling scrape configs from upstream: %v", err)
} else {
Expand All @@ -61,7 +54,7 @@ func (t *UpstreamPullJob) Run() {
}

func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error {
logger.Tracef("pulling scrape configs from upstream since: %v", t.lastFetchedID)
logger.Tracef("pulling scrape configs from upstream since: %v", t.lastRuntime)

endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName)
if err != nil {
Expand All @@ -76,7 +69,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo
req.SetBasicAuth(config.Username, config.Password)

params := url.Values{}
params.Add("since", t.lastFetchedID.String())
params.Add("since", t.lastRuntime.Format(time.RFC3339))
req.URL.RawQuery = params.Encode()

httpClient := &http.Client{}
Expand All @@ -86,6 +79,11 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("server returned unexpected status:%s (%s)", resp.Status, body)
}

var scrapeConfigs []models.ConfigScraper
if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil {
return fmt.Errorf("error decoding JSON response: %w", err)
Expand All @@ -95,7 +93,7 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo
return nil
}

t.lastFetchedID = scrapeConfigs[len(scrapeConfigs)-1].ID
t.lastRuntime = scrapeConfigs[len(scrapeConfigs)-1].UpdatedAt

logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs))

Expand All @@ -104,114 +102,3 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo
UpdateAll: true,
}).Create(&scrapeConfigs).Error
}

type LastPushedConfigResult struct {
ConfigID uuid.UUID
AnalysisID uuid.UUID
ChangeID uuid.UUID
}

// UpstreamPushJob pushes scrape config results to the upstream server
type UpstreamPushJob struct {
status LastPushedConfigResult

initiated bool
}

// init initializes the last pushed ids ...
func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error {
endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName)
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
}

req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return fmt.Errorf("error creating new http request: %w", err)
}

req.SetBasicAuth(config.Username, config.Password)

httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil {
return fmt.Errorf("error decoding JSON response: %w", err)
}

return nil
}

func (t *UpstreamPushJob) Run() {
ctx := api.DefaultContext

jobHistory := models.NewJobHistory("UpstreamPushJob", "Config", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if !t.initiated {
logger.Debugf("initializing upstream push job")
if err := t.init(ctx, api.UpstreamConfig); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error initializing upstream push job: %v", err)
return
}

t.initiated = true
}

if err := t.run(ctx); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pushing to upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error {
pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName}
if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil {
return err
}

if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil {
return err
}

if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil {
return err
}

if pushData.Count() == 0 {
return nil
}

logger.Tracef("pushing %d config scrape results to upstream", pushData.Count())
if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil {
return fmt.Errorf("error pushing to upstream: %w", err)
}

if len(pushData.ConfigItems) > 0 {
t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID
}

if len(pushData.ConfigAnalysis) > 0 {
t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID
}

if len(pushData.ConfigChanges) > 0 {
id := pushData.ConfigChanges[len(pushData.ConfigChanges)-1].ID
parsed, err := uuid.Parse(id)
if err != nil {
return err
}

t.status.ChangeID = parsed
}

return nil
}

0 comments on commit b8260ac

Please sign in to comment.