Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upstream sync #300

Merged
merged 13 commits into from
Oct 5, 2023
Merged
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0
with:
go-version: 1.19.x
go-version: 1.20.x
- name: golangci-lint
uses: golangci/golangci-lint-action@08e2f20817b15149a52b5b3ebe7de50aff2ba8c5 # v3.4.0
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0
with:
go-version: v1.19.x
go-version: v1.20.x
- uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2
with:
path: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@bfdd3570ce990073878bf10f6b2d79082de49492 # v2.2.0
with:
go-version: 1.19.x
go-version: 1.20.x
- name: Checkout code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- uses: actions/cache@8492260343ad570701412c2f464a5877dc76bace # v2
Expand Down
12 changes: 3 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19@sha256:8cefba2710250b21a8b8e32281788c5b53dc561ba0c51ea7de92b9a350663b7d as builder
FROM golang:1.20@sha256:bc5f0b5e43282627279fe5262ae275fecb3d2eae3b33977a7fd200c7a760d6f1 as builder
WORKDIR /app
COPY ./ ./

Expand All @@ -10,17 +10,11 @@ WORKDIR /app
RUN go version
RUN make build

FROM ubuntu:bionic@sha256:14f1045816502e16fcbfc0b2a76747e9f5e40bc3899f8cfe20745abaafeaeab3
FROM ubuntu:jammy@sha256:0bced47fffa3361afa981854fcabcd4577cd43cebbb808cea2b1f33a3dd7f508
WORKDIR /app

# install CA certificates
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -Rf /var/lib/apt/lists/* && \
rm -Rf /usr/share/doc && rm -Rf /usr/share/man && \
apt-get clean

COPY --from=builder /app/.bin/config-db /app
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

RUN /app/config-db go-offline

Expand Down
3 changes: 3 additions & 0 deletions api/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/upstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -11,6 +12,8 @@ var (
KubernetesRestConfig *rest.Config
Namespace string
DefaultContext ScrapeContext

UpstreamConfig upstream.UpstreamConfig
)

type Scraper interface {
Expand Down
24 changes: 17 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@ package cmd
import (
"fmt"
"os"
"strconv"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/jobs"
"github.com/flanksource/config-db/scrapers"
"github.com/flanksource/config-db/utils/kube"
"github.com/google/uuid"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

var (
agentID = uuid.Nil // the derived agent id from the agentName
agentName string // name of the agent passed as a CLI arg
)

var dev bool
var httpPort, metricsPort, devGuiPort int
var disableKubernetes bool
Expand Down Expand Up @@ -78,7 +73,22 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.StringVar(&scrapers.DefaultSchedule, "default-schedule", "@every 60m", "Default schedule for configs that don't specfiy one")
flags.StringVar(&scrapers.StaleTimeout, "stale-timeout", "30m", "Delete config items not scraped within the timeout")
flags.StringVar(&publicEndpoint, "public-endpoint", "http://localhost:8080", "Public endpoint that this instance is exposed under")
flags.StringVar(&agentName, "agent-name", "", "Name of the agent")

// Flags for push/pull
var upstreamPageSizeDefault = 500
if val, exists := os.LookupEnv("UPSTREAM_PAGE_SIZE"); exists {
if parsed, err := strconv.Atoi(val); err != nil || parsed < 0 {
logger.Fatalf("invalid value=%s for UPSTREAM_PAGE_SIZE. Must be a postive number", val)
} else {
upstreamPageSizeDefault = parsed
}
}

flags.StringVar(&api.UpstreamConfig.Host, "upstream-host", os.Getenv("UPSTREAM_HOST"), "central mission control instance to sync scrape configs & their results")
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
flags.StringVar(&api.UpstreamConfig.Username, "upstream-user", os.Getenv("UPSTREAM_USER"), "upstream username")
flags.StringVar(&api.UpstreamConfig.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password")
flags.StringVar(&api.UpstreamConfig.AgentName, "agent-name", os.Getenv("AGENT_NAME"), "name of this agent")
flags.IntVar(&jobs.ReconcilePageSize, "upstream-page-size", upstreamPageSizeDefault, "upstream reconciliation page size")
}

func init() {
Expand Down
14 changes: 2 additions & 12 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/jobs"
"github.com/flanksource/config-db/query"
"github.com/google/uuid"

"github.com/flanksource/config-db/scrapers"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -55,17 +56,6 @@ func serve(configFiles []string) {
e.GET("/query", query.Handler)
e.POST("/run/:id", scrapers.RunNowHandler)

if agentName != "" {
agent, err := db.FindAgentByName(context.Background(), agentName)
if err != nil {
logger.Fatalf("error searching for agent (name=%s): %v", agentName, err)
} else if agent == nil {
logger.Fatalf("agent not found (name=%s)", agentName)
} else {
agentID = agent.ID
}
}

go startScraperCron(configFiles)

go jobs.ScheduleJobs()
Expand All @@ -89,7 +79,7 @@ func startScraperCron(configFiles []string) {
}
}

scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(agentID)
scraperConfigsDB, err := db.GetScrapeConfigsOfAgent(uuid.Nil)
if err != nil {
logger.Fatalf("error getting configs from database: %v", err)
}
Expand Down
20 changes: 19 additions & 1 deletion jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,39 @@ import (
"runtime"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
"github.com/robfig/cron/v3"
)

var FuncScheduler = cron.New()

const (
PullConfigScrapersFromUpstreamSchedule = "@every 30s"
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
PushConfigResultsToUpstreamSchedule = "@every 10s"
ReconcileConfigsToUpstreamSchedule = "@every 3h"
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
)

func ScheduleJobs() {
scheduleFunc := func(schedule string, fn func()) {
if _, err := FuncScheduler.AddFunc(schedule, fn); err != nil {
logger.Errorf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
logger.Fatalf("Error scheduling %s job", runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}
}

scheduleFunc("@every 24h", DeleteOldConfigChanges)
scheduleFunc("@every 24h", DeleteOldConfigAnalysis)
scheduleFunc("@every 24h", CleanupConfigItems)

if api.UpstreamConfig.Valid() {
pullJob := &UpstreamPullJob{}
pullJob.Run()

if _, err := FuncScheduler.AddJob(PullConfigScrapersFromUpstreamSchedule, pullJob); err != nil {
logger.Fatalf("Failed to schedule job [PullUpstreamScrapeConfigs]: %v", err)
}

scheduleFunc(ReconcileConfigsToUpstreamSchedule, ReconcileConfigScraperResults)
}

FuncScheduler.Start()
}
104 changes: 104 additions & 0 deletions jobs/sync_upstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package jobs

import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/upstream"
"gorm.io/gorm/clause"
)

var ReconcilePageSize int

// ReconcileConfigScraperResults pushes missing scrape config results to the upstream server
func ReconcileConfigScraperResults() {
ctx := api.DefaultContext

jobHistory := models.NewJobHistory("PushUpstream", "Config", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

reconciler := upstream.NewUpstreamReconciler(api.UpstreamConfig, ReconcilePageSize)
if err := reconciler.Sync(ctx, "config_items"); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("failed to sync table config_items: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

// UpstreamPullJob pulls scrape configs from the upstream server
type UpstreamPullJob struct {
lastRuntime time.Time
}

func (t *UpstreamPullJob) Run() {
jobHistory := models.NewJobHistory("PullUpstream", "Config", "")
_ = db.PersistJobHistory(jobHistory.Start())
defer func() { _ = db.PersistJobHistory(jobHistory.End()) }()

if err := t.pull(api.DefaultContext, api.UpstreamConfig); err != nil {
jobHistory.AddError(err.Error())
logger.Errorf("error pulling scrape configs from upstream: %v", err)
} else {
jobHistory.IncrSuccess()
}
}

func (t *UpstreamPullJob) pull(ctx api.ScrapeContext, config upstream.UpstreamConfig) error {
logger.Tracef("pulling scrape configs from upstream since: %v", t.lastRuntime)

endpoint, err := url.JoinPath(config.Host, "upstream", "scrapeconfig", "pull", config.AgentName)
if err != nil {
return fmt.Errorf("error creating url endpoint for host %s: %w", config.Host, err)
}

req, err := http.NewRequest(http.MethodGet, endpoint, nil)
if err != nil {
return fmt.Errorf("error creating new http request: %w", err)
}

req.SetBasicAuth(config.Username, config.Password)

params := url.Values{}
params.Add("since", t.lastRuntime.Format(time.RFC3339Nano))
req.URL.RawQuery = params.Encode()

httpClient := &http.Client{}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("error making request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("server returned unexpected status:%s (%s)", resp.Status, body)
}

var scrapeConfigs []models.ConfigScraper
if err := json.NewDecoder(resp.Body).Decode(&scrapeConfigs); err != nil {
return fmt.Errorf("error decoding JSON response: %w", err)
}

if len(scrapeConfigs) == 0 {
return nil
}

t.lastRuntime = scrapeConfigs[len(scrapeConfigs)-1].UpdatedAt

logger.Tracef("fetched %d scrape configs from upstream", len(scrapeConfigs))

return ctx.DB().Omit("agent_id").Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
UpdateAll: true,
}).Create(&scrapeConfigs).Error
}