Skip to content

Commit

Permalink
refactor: context
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 26, 2023
1 parent d8f7b94 commit 060cb42
Show file tree
Hide file tree
Showing 35 changed files with 547 additions and 2,275 deletions.
133 changes: 133 additions & 0 deletions api/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package api

import (
"context"
"errors"
"fmt"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/jackc/pgx/v5/pgxpool"
"gorm.io/gorm"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type ScrapeContext interface {
duty.DBContext

IsTrace() bool

WithContext(ctx context.Context) ScrapeContext

WithScrapeConfig(scraper *v1.ScrapeConfig) ScrapeContext
ScrapeConfig() *v1.ScrapeConfig

Namespace() string
Kubernetes() kubernetes.Interface
KubernetesRestConfig() *rest.Config

GetEnvVarValue(input types.EnvVar) (string, error)
GetEnvValueFromCache(env types.EnvVar) (string, error)

HydrateConnection(connectionIdentifier string) (*models.Connection, error)
}

type scrapeContext struct {
context.Context

db *gorm.DB
pool *pgxpool.Pool

namespace string
kubernetes *kubernetes.Clientset
kubernetesRestConfig *rest.Config

scrapeConfig *v1.ScrapeConfig
}

func NewScrapeContext(ctx context.Context, db *gorm.DB, pool *pgxpool.Pool) ScrapeContext {
return &scrapeContext{
Context: ctx,
namespace: Namespace,
kubernetes: KubernetesClient,
kubernetesRestConfig: KubernetesRestConfig,
db: db,
pool: pool,
}
}

func (ctx scrapeContext) WithContext(from context.Context) ScrapeContext {
ctx.Context = from
return &ctx
}

func (ctx scrapeContext) WithScrapeConfig(scraper *v1.ScrapeConfig) ScrapeContext {
ctx.scrapeConfig = scraper
return &ctx
}

func (ctx scrapeContext) DB() *gorm.DB {
return ctx.db
}

func (ctx scrapeContext) Pool() *pgxpool.Pool {
return ctx.pool
}

func (ctx scrapeContext) ScrapeConfig() *v1.ScrapeConfig {
return ctx.scrapeConfig
}

func (ctx scrapeContext) Namespace() string {
return ctx.namespace
}

func (c scrapeContext) Kubernetes() kubernetes.Interface {
return c.kubernetes
}

func (c scrapeContext) KubernetesRestConfig() *rest.Config {
return c.kubernetesRestConfig
}

func (ctx scrapeContext) IsTrace() bool {
return ctx.scrapeConfig.Spec.IsTrace()
}

func (ctx *scrapeContext) HydrateConnection(connectionName string) (*models.Connection, error) {
if connectionName == "" {
return nil, nil
}

if ctx.db == nil {
return nil, errors.New("db has not been initialized")
}

if ctx.kubernetes == nil {
return nil, errors.New("kubernetes clientset has not been initialized")
}

connection, err := duty.HydratedConnectionByURL(ctx, ctx.db, ctx.kubernetes, ctx.namespace, connectionName)
if err != nil {
return nil, err
}

// Connection name was explicitly provided but was not found.
// That's an error.
if connection == nil {
return nil, fmt.Errorf("connection %s not found", connectionName)
}

return connection, nil
}

func (c *scrapeContext) GetEnvVarValue(input types.EnvVar) (string, error) {
return duty.GetEnvValueFromCache(c.kubernetes, input, c.namespace)
}

func (ctx *scrapeContext) GetEnvValueFromCache(env types.EnvVar) (string, error) {
return duty.GetEnvValueFromCache(ctx.kubernetes, env, ctx.namespace)
}
24 changes: 9 additions & 15 deletions api/global.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package api

import (
"context"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

var KubernetesClient *kubernetes.Clientset
var KubernetesRestConfig *rest.Config
var Namespace string
var (
KubernetesClient *kubernetes.Clientset
KubernetesRestConfig *rest.Config
Namespace string
DefaultContext ScrapeContext
)

func NewScrapeContext(ctx context.Context, scraper v1.ScrapeConfig) *v1.ScrapeContext {
return &v1.ScrapeContext{
Context: ctx,
ScrapeConfig: scraper,
Namespace: Namespace,
Kubernetes: KubernetesClient,
KubernetesRestConfig: KubernetesRestConfig,
DB: db.DefaultDB(),
}
type Scraper interface {
Scrape(ctx ScrapeContext) v1.ScrapeResults
CanScrape(config v1.ScraperSpec) bool
}
34 changes: 0 additions & 34 deletions api/v1/azure.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package v1

import (
"fmt"

"github.com/flanksource/duty"
"github.com/flanksource/duty/types"
)

Expand All @@ -25,34 +22,3 @@ type Azure struct {
ClientSecret types.EnvVar `yaml:"clientSecret,omitempty" json:"clientSecret,omitempty"`
TenantID string `yaml:"tenantID" json:"tenantID"`
}

// HydrateConnection populates the credentials in Azure from the connection name (if available)
// else it'll try to fetch the credentials from kubernetes secrets.
func (t *Azure) HydrateConnection(ctx *ScrapeContext) error {
if t.ConnectionName != "" {
connection, err := ctx.HydrateConnectionByURL(t.ConnectionName)
if err != nil {
return fmt.Errorf("could not hydrate connection: %w", err)
} else if connection == nil {
return fmt.Errorf("connection %s not found", t.ConnectionName)
}

t.ClientID.ValueStatic = connection.Username
t.ClientSecret.ValueStatic = connection.Password
t.TenantID = connection.Properties["tenant"]
return nil
}

var err error
t.ClientID.ValueStatic, err = duty.GetEnvValueFromCache(ctx.Kubernetes, t.ClientID, ctx.Namespace)
if err != nil {
return fmt.Errorf("failed to get client id: %w", err)
}

t.ClientSecret.ValueStatic, err = duty.GetEnvValueFromCache(ctx.Kubernetes, t.ClientSecret, ctx.Namespace)
if err != nil {
return fmt.Errorf("failed to get client secret: %w", err)
}

return nil
}
79 changes: 0 additions & 79 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
package v1

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/models"
"gorm.io/gorm"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// Scraper ...
// +kubebuilder:object:generate=false
type Scraper interface {
Scrape(ctx *ScrapeContext) ScrapeResults
CanScrape(config ScraperSpec) bool
}

// Analyzer ...
// +kubebuilder:object:generate=false
type Analyzer func(configs []ScrapeResult) AnalysisResult
Expand Down Expand Up @@ -341,67 +326,3 @@ type RunNowResponse struct {
Failed int `json:"failed"`
Errors []string `json:"errors,omitempty"`
}

// ScrapeContext ...
// +kubebuilder:object:generate=false
type ScrapeContext struct {
context.Context
DB *gorm.DB
Namespace string
Kubernetes *kubernetes.Clientset
KubernetesRestConfig *rest.Config
ScrapeConfig ScrapeConfig
}

func (ctx ScrapeContext) Find(path string) ([]string, error) {
return filepath.Glob(path)
}

// Read returns the contents of a file, the base filename and an error
func (ctx ScrapeContext) Read(path string) ([]byte, string, error) {
content, err := os.ReadFile(path)
filename := filepath.Base(path)
return content, filename, err
}

// GetNamespace ...
func (ctx ScrapeContext) GetNamespace() string {
return ctx.Namespace
}

// IsTrace ...
func (ctx ScrapeContext) IsTrace() bool {
return ctx.ScrapeConfig.Spec.IsTrace()
}

// HydrateConnectionByURL ...
func (ctx *ScrapeContext) HydrateConnectionByURL(connectionName string) (*models.Connection, error) {
if connectionName == "" {
return nil, nil
}

if !strings.HasPrefix(connectionName, "connection://") {
return nil, fmt.Errorf("invalid connection name: [%s]", connectionName)
}

if ctx.DB == nil {
return nil, errors.New("db has not been initialized")
}

if ctx.Kubernetes == nil {
return nil, errors.New("kubernetes clientset has not been initialized")
}

connection, err := duty.HydratedConnectionByURL(ctx, ctx.DB, ctx.Kubernetes, ctx.Namespace, connectionName)
if err != nil {
return nil, err
}

// Connection name was explicitly provided but was not found.
// That's an error.
if connection == nil {
return nil, fmt.Errorf("connection %s not found", connectionName)
}

return connection, nil
}
4 changes: 4 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"os"

Expand All @@ -13,6 +14,7 @@ import (
ctrlzap "sigs.k8s.io/controller-runtime/pkg/log/zap"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
configsv1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/controllers"
"github.com/flanksource/config-db/db"
Expand All @@ -38,6 +40,8 @@ func init() {

func run(cmd *cobra.Command, args []string) {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)

zapLogger := logger.GetZapLogger()
if zapLogger == nil {
logger.Fatalf("failed to get zap logger")
Expand Down
9 changes: 5 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,23 @@ var Run = &cobra.Command{

if db.ConnectionString != "" {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)
}

if db.ConnectionString == "" && outputDir == "" {
logger.Fatalf("skipping export: neither --output-dir nor --db is specified")
}

for _, scraperConfig := range scraperConfigs {
ctx := api.NewScrapeContext(context.Background(), scraperConfig)
for i := range scraperConfigs {
ctx := api.DefaultContext.WithScrapeConfig(&scraperConfigs[i])
if err := scrapeAndStore(ctx); err != nil {
logger.Errorf("error scraping config: (name=%s) %v", scraperConfig.Name, err)
logger.Errorf("error scraping config: (name=%s) %v", scraperConfigs[i].Name, err)
}
}
},
}

func scrapeAndStore(ctx *v1.ScrapeContext) error {
func scrapeAndStore(ctx api.ScrapeContext) error {
results, err := scrapers.Run(ctx)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var Serve = &cobra.Command{

func serve(configFiles []string) {
db.MustInit()
api.DefaultContext = api.NewScrapeContext(context.Background(), db.DefaultDB(), db.Pool)

e := echo.New()
// PostgREST needs to know how it is exposed to create the correct links
db.HTTPEndpoint = publicEndpoint + "/db"
Expand Down Expand Up @@ -101,7 +103,7 @@ func startScraperCron(configFiles []string) {
scrapers.AddToCron(_scraper)

fn := func() {
ctx := api.NewScrapeContext(context.Background(), _scraper)
ctx := api.DefaultContext.WithScrapeConfig(&_scraper)
if _, err := scrapers.RunScraper(ctx); err != nil {
logger.Errorf("Error running scraper(id=%s): %v", scraper.ID, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/scrapeconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ScrapeConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request

// Sync jobs if new scrape config is created
if changed || scrapeConfig.Generation == 1 {
ctx := api.NewScrapeContext(ctx, *scrapeConfig)
ctx := api.DefaultContext.WithScrapeConfig(scrapeConfig)
if _, err := scrapers.RunScraper(ctx); err != nil {
logger.Error(err, "failed to run scraper")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
Expand Down
Loading

0 comments on commit 060cb42

Please sign in to comment.