Skip to content

Commit

Permalink
feat: get the last pushed ids from the upstream server
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 28, 2023
1 parent 78c7ec7 commit f703448
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 42 deletions.
3 changes: 0 additions & 3 deletions api/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/upstream"
"github.com/google/uuid"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -14,8 +13,6 @@ var (
Namespace string
DefaultContext ScrapeContext

// the derived agent id from the agentName
AgentID uuid.UUID
UpstreamConfig upstream.UpstreamConfig
)

Expand Down
14 changes: 2 additions & 12 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/jobs"
"github.com/flanksource/config-db/query"
"github.com/google/uuid"

"github.com/flanksource/config-db/scrapers"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -55,17 +56,6 @@ func serve(configFiles []string) {
e.GET("/query", query.Handler)
e.POST("/run/:id", scrapers.RunNowHandler)

if api.UpstreamConfig.AgentName != "" {
agent, err := db.FindAgentByName(context.Background(), api.UpstreamConfig.AgentName)
if err != nil {
logger.Fatalf("error searching for agent (name=%s): %v", api.UpstreamConfig.AgentName, err)
} else if agent == nil {
logger.Fatalf("agent not found (name=%s)", api.UpstreamConfig.AgentName)
} else {
api.AgentID = agent.ID
}
}

go startScraperCron(configFiles)

go jobs.ScheduleJobs()
Expand All @@ -89,7 +79,7 @@ func startScraperCron(configFiles []string) {
}
}

scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(api.AgentID)
scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(uuid.Nil)
if err != nil {
logger.Fatalf("error getting configs from database: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jobs
import (
"reflect"
"runtime"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
Expand Down Expand Up @@ -33,7 +32,7 @@ func ScheduleJobs() {
pullJob := &UpstreamPullJob{}
pullJob.Run()

pushJob := &UpstreamPushJob{MaxAge: time.Minute * 5}
pushJob := &UpstreamPushJob{}
pushJob.Run()

if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil {
Expand Down
58 changes: 33 additions & 25 deletions jobs/sync_upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"net/url"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
Expand Down Expand Up @@ -106,31 +105,42 @@ func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamCo
}).Create(&scrapeConfigs).Error
}

type LastPushedConfigResult struct {
ConfigID uuid.UUID
AnalysisID uuid.UUID
ChangeID uuid.UUID
}

// UpstreamPushJob pushes scrape config results to the upstream server
type UpstreamPushJob struct {
lastConfigItemID uuid.UUID
lastAnalysisID uuid.UUID
lastChangeID uuid.UUID
status LastPushedConfigResult

initiated bool

// MaxAge defines how far back we look into the past on startup when
// lastRuntime is zero.
MaxAge time.Duration
}

// init initializes the last pushed ids ...
func (t *UpstreamPushJob) init(ctx api.ScrapeContext) error {
if err := ctx.DB().Debug().Model(&models.ConfigItem{}).Select("id").Where("NOW() - updated_at <= ?", t.MaxAge).Scan(&t.lastConfigItemID).Error; err != nil {
return fmt.Errorf("error getting last config item id: %w", err)
func (t *UpstreamPushJob) init(ctx api.ScrapeContext, config upstream.UpstreamConfig) error {
endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "status", config.AgentName)
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
}

if err := ctx.DB().Debug().Model(&models.ConfigAnalysis{}).Select("id").Where("NOW() - first_observed <= ?", t.MaxAge).Scan(&t.lastAnalysisID).Error; err != nil {
return fmt.Errorf("error getting last analysis id: %w", err)
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return fmt.Errorf("error creating new http request: %w", err)
}

if err := ctx.DB().Debug().Model(&models.ConfigChange{}).Select("id").Where("NOW() - created_at <= ?", t.MaxAge).Scan(&t.lastChangeID).Error; err != nil {
return fmt.Errorf("error getting last change id: %w", err)
req.SetBasicAuth(config.Username, config.Password)

httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if err := json.NewDecoder(resp.Body).Decode(&t.status); err != nil {
return fmt.Errorf("error decoding JSON response: %w", err)
}

return nil
Expand All @@ -145,7 +155,7 @@ func (t *UpstreamPushJob) Run() {

if !t.initiated {
logger.Debugf("initializing upstream push job")
if err := t.init(ctx); err != nil {
if err := t.init(ctx, api.UpstreamConfig); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error initializing upstream push job: %v", err)
return
Expand All @@ -163,36 +173,34 @@ func (t *UpstreamPushJob) Run() {
}

func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error {
logger.Tracef("running configs upstream push job")

pushData := &upstream.PushData{AgentName: api.UpstreamConfig.AgentName}
if err := ctx.DB().Where("id > ?", t.lastConfigItemID).Find(&pushData.ConfigItems).Error; err != nil {
if err := ctx.DB().Where("id > ?", t.status.ConfigID).Find(&pushData.ConfigItems).Error; err != nil {
return err
}

if err := ctx.DB().Where("id > ?", t.lastAnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil {
if err := ctx.DB().Where("id > ?", t.status.AnalysisID).Find(&pushData.ConfigAnalysis).Error; err != nil {
return err
}

if err := ctx.DB().Where("id > ?", t.lastChangeID).Find(&pushData.ConfigChanges).Error; err != nil {
if err := ctx.DB().Where("id > ?", t.status.ChangeID).Find(&pushData.ConfigChanges).Error; err != nil {
return err
}

logger.Tracef("pushing %d config scrape results to upstream", pushData.Count())
if pushData.Count() == 0 {
return nil
}

logger.Tracef("pushing %d config scrape results to upstream", pushData.Count())
if err := upstream.Push(ctx, api.UpstreamConfig, pushData); err != nil {
return fmt.Errorf("error pushing to upstream: %w", err)
}

if len(pushData.ConfigItems) > 0 {
t.lastConfigItemID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID
t.status.ConfigID = pushData.ConfigItems[len(pushData.ConfigItems)-1].ID
}

if len(pushData.ConfigAnalysis) > 0 {
t.lastAnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID
t.status.AnalysisID = pushData.ConfigAnalysis[len(pushData.ConfigAnalysis)-1].ID
}

if len(pushData.ConfigChanges) > 0 {
Expand All @@ -202,7 +210,7 @@ func (t *UpstreamPushJob) run(ctx api.ScrapeContext) error {
return err
}

t.lastChangeID = parsed
t.status.ChangeID = parsed
}

return nil
Expand Down

0 comments on commit f703448

Please sign in to comment.