Skip to content

Commit

Permalink
feat: new job type for sync canary jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Oct 23, 2023
1 parent 603e32c commit a2dd880
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 65 deletions.
3 changes: 3 additions & 0 deletions api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/commons/logger"
ctemplate "github.com/flanksource/commons/template"
"github.com/flanksource/duty"
dutyCtx "github.com/flanksource/duty/context"

Check failure on line 14 in api/context/context.go

View workflow job for this annotation

GitHub Actions / test

github.com/flanksource/[email protected]: replacement directory ../duty does not exist
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
Expand All @@ -19,6 +20,8 @@ import (
"k8s.io/client-go/kubernetes"
)

var DefaultContext dutyCtx.Context

type KubernetesContext struct {
gocontext.Context
Kommons *kommons.Client
Expand Down
5 changes: 5 additions & 0 deletions api/v1/canary_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/flanksource/canary-checker/api/external"
"github.com/flanksource/commons/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type ResultMode string
Expand Down Expand Up @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string {
return check.GetEndpoint()
}

func (c Canary) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Name: c.Name, Namespace: c.Namespace}
}

func (c *Canary) SetRunnerName(name string) {
c.Status.runnerName = name
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
gocontext "context"
"os"
"time"

apicontext "github.com/flanksource/canary-checker/api/context"
"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs"
Expand All @@ -16,9 +18,12 @@ import (
"github.com/flanksource/canary-checker/pkg"
"github.com/flanksource/canary-checker/pkg/controllers"
"github.com/flanksource/canary-checker/pkg/labels"
commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/go-logr/zapr"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) {
if err := db.Init(); err != nil {
logger.Fatalf("error connecting with postgres: %v", err)
}
kommonsClient, k8s, err := pkg.NewKommonsClient()
if err != nil {
logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err)
}

apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))).
WithDB(db.Gorm, db.Pool).
WithKubernetes(k8s).
WithKommons(kommonsClient).
WithNamespace(runner.WatchNamespace)

cache.PostgresCache = cache.NewPostgresCache(db.Pool)
if operatorExecutor {
logger.Infof("Starting executors")
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,5 @@ require (
sigs.k8s.io/kustomize/kyaml v0.14.3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect
)

replace github.com/flanksource/duty => ../duty
23 changes: 23 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ github.com/flanksource/commons v1.17.0 h1:rSahn6c4vyq3bPC5jsayET4y8TECRz6Q8NbooI
github.com/flanksource/commons v1.17.0/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U=
github.com/flanksource/duty v1.0.201 h1:c8r02bfuF47E2svK+qXCLHKaSqOCZZHKPj+v54eimqc=
github.com/flanksource/duty v1.0.201/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE=
github.com/flanksource/commons v1.15.0 h1:p74hrKzIz0r3H8YN3CuB8ePJOjzPFO0BRLVmpXmeqvY=
github.com/flanksource/commons v1.15.0/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs=
github.com/flanksource/commons v1.15.1 h1:cFvxQd5SBFe+q16ciz8Q2IeBMeQ7+atdACGanbW27hg=
github.com/flanksource/commons v1.15.1/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs=
github.com/flanksource/duty v1.0.191 h1:acnvyTeQlfqmtyXxWprNFGK/vBTUlqkYwxEPLtXSPrk=
github.com/flanksource/duty v1.0.191/go.mod h1:ikyl/TcRy6Cc0R5b0wEHT7CecV7gyJvrDGq/4oIZHoc=
github.com/flanksource/duty v1.0.197 h1:KRw4EPAD2kcqNPkipnkHzlbf5wmLqg3JgtXqiPzCLhw=
github.com/flanksource/duty v1.0.197/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.20.18 h1:qYiznMxhq+Zau5iWnVzW1yDzA1deHOsmo6yldCN7JhQ=
github.com/flanksource/gomplate/v3 v3.20.18/go.mod h1:2GgHZ2vWmtDspJMBfUIryOuzJSwc8jU7Kw9fDLr0TMA=
Expand Down Expand Up @@ -1127,6 +1135,10 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI=
github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8=
github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo=
github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/henvic/httpretty v0.1.2 h1:EQo556sO0xeXAjP10eB+BZARMuvkdGqtfeS4Ntjvkiw=
github.com/henvic/httpretty v0.1.2/go.mod h1:ViEsly7wgdugYtymX54pYp6Vv2wqZmNHayJ6q8tlKCc=
github.com/hirochachacha/go-smb2 v1.1.0 h1:b6hs9qKIql9eVXAiN0M2wSFY5xnhbHAQoCwRKbaRTZI=
Expand Down Expand Up @@ -1507,6 +1519,8 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc=
github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA=
github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
Expand Down Expand Up @@ -2300,6 +2314,13 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU=
gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0=
gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8=
gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU=
gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/plugin/prometheus v0.0.0-20230504115745-1aec2356381b h1:uHPZdwwf4+AVvAEgZ/LQR1UTub8LJ2nh0wQDW3Dt4jE=
Expand Down Expand Up @@ -2345,6 +2366,8 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c

// Sync jobs if canary is created or updated
if canary.Generation == 1 {
if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary
}
r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration)

if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
return nil, err
}
return dbCanary, nil
Expand Down
21 changes: 11 additions & 10 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package db

import (
"context"
gocontext "context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/google/uuid"
Expand All @@ -23,7 +24,7 @@ import (
"gorm.io/gorm/clause"
)

func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) {
query := `
SELECT json_agg(
jsonb_set_lax(to_jsonb(canaries),'{checks}', (
Expand All @@ -49,7 +50,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
args["namespace"] = namespace
}

rows, err := Pool.Query(context.Background(), query, args)
rows, err := ctx.Pool().Query(ctx, query, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,17 +126,17 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) {
return check.ID, nil
}

func GetTransformedCheckIDs(canaryID string) ([]string, error) {
func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) {
var ids []string
err := Gorm.Table("checks").
err := ctx.DB().Table("checks").
Select("id").
Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID).
Find(&ids).
Error
return ids, err
}

func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error {
if len(ids) == 0 {
return nil
}
Expand All @@ -158,20 +159,20 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
})
}
}
return Gorm.Table("check_statuses").
return ctx.DB().Table("check_statuses").
Create(objs).
Error
}

func RemoveTransformedChecks(ids []string) error {
func RemoveTransformedChecks(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
updates := map[string]any{
"deleted_at": gorm.Expr("NOW()"),
}

return Gorm.Table("checks").
return ctx.DB().Table("checks").
Where("id in (?)", ids).
Where("transformed = true").
Updates(updates).
Expand Down Expand Up @@ -278,7 +279,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) {
return &model, nil
}

func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) {
func FindDeletedChecksSince(ctx gocontext.Context, since time.Time) ([]string, error) {
var ids []string
err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error
return ids, err
Expand Down
Loading

0 comments on commit a2dd880

Please sign in to comment.