Skip to content

Commit

Permalink
fix: dynamic client for ambigious Kind
Browse files Browse the repository at this point in the history
we were getting resource not found errors when fetching resources with
the dynamic client.

The issue was that the "Kind" alone wasn't specific enough to get the
correct dynamic client. Example: I had two HelmChart kinds

helmcharts                                         helm.cattle.io/v1                        true         HelmChart
helmcharts                         hc              source.toolkit.fluxcd.io/v1              true         HelmChart

Changed that to create the dynamic client for <Group, Version, Kind>
instead.
  • Loading branch information
adityathebe committed Oct 17, 2024
1 parent ac017ac commit dbdb6ee
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 17 deletions.
24 changes: 20 additions & 4 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func Stop() {

func SyncScrapeConfigs(sc context.Context) {
if globalScraperSempahore == nil {
globalScraperSempahore = semaphore.NewWeighted(int64(sc.Properties().Int("scraper.concurrency", ScraperConcurrency)))
globalScraperSempahore = semaphore.NewWeighted(
int64(sc.Properties().Int("scraper.concurrency", ScraperConcurrency)),
)
}

if scraperTypeSemaphores == nil {
Expand Down Expand Up @@ -187,8 +189,12 @@ func SyncScrapeJob(sc api.ScrapeContext) error {
}

func newScraperJob(sc api.ScrapeContext) *job.Job {
schedule, _ := lo.Coalesce(sc.Properties().String(fmt.Sprintf("scraper.%s.schedule", sc.ScrapeConfig().UID), sc.ScrapeConfig().Spec.Schedule), DefaultSchedule)
minScheduleAllowed := sc.Properties().Duration(fmt.Sprintf("scraper.%s.schedule.min", sc.ScrapeConfig().Type()), MinScraperSchedule)
schedule, _ := lo.Coalesce(
sc.Properties().String(fmt.Sprintf("scraper.%s.schedule", sc.ScrapeConfig().UID), sc.ScrapeConfig().Spec.Schedule),
DefaultSchedule,
)
minScheduleAllowed := sc.Properties().
Duration(fmt.Sprintf("scraper.%s.schedule.min", sc.ScrapeConfig().Type()), MinScraperSchedule)

// Attempt to get a fixed interval from the schedule.
// NOTE: Only works for fixed interval schedules.
Expand All @@ -197,7 +203,12 @@ func newScraperJob(sc api.ScrapeContext) *job.Job {
interval := time.Until(parsedSchedule.Next(time.Now()))
if interval < minScheduleAllowed {
newSchedule := fmt.Sprintf("@every %ds", int(minScheduleAllowed.Seconds()))
sc.Logger.Infof("[%s] scraper schedule %s too short, using minimum allowed %q", sc.ScrapeConfig().Name, schedule, newSchedule)
sc.Logger.Infof(
"[%s] scraper schedule %s too short, using minimum allowed %q",
sc.ScrapeConfig().Name,
schedule,
newSchedule,
)

schedule = newSchedule
}
Expand Down Expand Up @@ -247,6 +258,11 @@ func scheduleScraperJob(sc api.ScrapeContext) error {
}

for _, config := range sc.ScrapeConfig().Spec.Kubernetes {
if config.ClusterName != "homelab" {
fmt.Println("skipping watch for", config.ClusterName)
continue
}

if len(config.Watch) == 0 {
config.Watch = v1.DefaultWatchKinds
}
Expand Down
20 changes: 17 additions & 3 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ type SharedInformerManager struct {

type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error {
func (t *SharedInformerManager) Register(
ctx api.ScrapeContext,
watchResource v1.KubernetesResourceToWatch,
buffer chan<- *unstructured.Unstructured,
deleteBuffer chan<- string,
) error {
apiVersion, kind := watchResource.ApiVersion, watchResource.Kind

informer, stopper, isNew := t.getOrCreate(ctx, apiVersion, kind)
Expand Down Expand Up @@ -82,7 +87,13 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
DeleteFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
if err != nil {
logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to get unstructured %v", err)
logToJobHistory(
ctx.DutyContext(),
"DeleteK8sWatchResource",
ctx.ScrapeConfig().GetPersistedID(),
"failed to get unstructured %v",
err,
)
return
}

Expand All @@ -104,7 +115,10 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
}

// getOrCreate returns an existing shared informer instance or creates & returns a new one.
func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, apiVersion, kind string) (informers.GenericInformer, chan struct{}, bool) {
func (t *SharedInformerManager) getOrCreate(
ctx api.ScrapeContext,
apiVersion, kind string,
) (informers.GenericInformer, chan struct{}, bool) {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down
50 changes: 40 additions & 10 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
coreV1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"

Expand Down Expand Up @@ -53,11 +54,19 @@ func getString(obj *unstructured.Unstructured, path ...string) string {
return s
}

func (kubernetes KubernetesScraper) IncrementalScrape(ctx api.ScrapeContext, config v1.Kubernetes, objects []*unstructured.Unstructured) v1.ScrapeResults {
func (kubernetes KubernetesScraper) IncrementalScrape(
ctx api.ScrapeContext,
config v1.Kubernetes,
objects []*unstructured.Unstructured,
) v1.ScrapeResults {
return ExtractResults(newKubernetesContext(ctx, config), objects)
}

func (kubernetes KubernetesScraper) IncrementalEventScrape(ctx api.ScrapeContext, config v1.Kubernetes, events []v1.KubernetesEvent) v1.ScrapeResults {
func (kubernetes KubernetesScraper) IncrementalEventScrape(
ctx api.ScrapeContext,
config v1.Kubernetes,
events []v1.KubernetesEvent,
) v1.ScrapeResults {
if len(events) == 0 {
return nil
}
Expand Down Expand Up @@ -101,17 +110,29 @@ func (kubernetes KubernetesScraper) IncrementalEventScrape(ctx api.ScrapeContext
if _, ok := seenObjects[cacheKey]; !ok {
kclient, ok := kindClientCache[resource.APIVersion+resource.Kind]
if !ok {
kclient, err = ctx.KubernetesDynamicClient().GetClientByKind(resource.Kind)
gv, _ := schema.ParseGroupVersion(resource.APIVersion)
kclient, err = ctx.KubernetesDynamicClient().GetClientByGroupVersionKind(gv.Group, gv.Version, resource.Kind)
if err != nil {
ctx.Errorf("failed to get dynamic client for (%s/%s)", gv, resource.Kind)
continue
}

kindClientCache[resource.APIVersion+resource.Kind] = kclient
}

ctx.DutyContext().Logger.V(5).Infof("fetching resource namespace=%s name=%s kind=%s apiVersion=%s", resource.Namespace, resource.Name, resource.Kind, resource.APIVersion)
ctx.DutyContext().
Logger.V(5).
Infof("fetching resource namespace=%s name=%s kind=%s apiVersion=%s", resource.Namespace, resource.Name, resource.Kind, resource.APIVersion)

obj, err := kclient.Namespace(resource.Namespace).Get(ctx, resource.Name, metav1.GetOptions{})
if err != nil {
ctx.DutyContext().Logger.Warnf("failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v", resource.Kind, resource.Name, resource.Namespace, err)
ctx.DutyContext().Logger.Warnf(
"failed to get resource (Kind=%s, Name=%s, Namespace=%s): %v",
resource.Kind,
resource.Name,
resource.Namespace,
err,
)
continue
} else if obj != nil {
seenObjects[cacheKey] = struct{}{} // mark it as seen so we don't run ketall.KetOne again (in this run)
Expand Down Expand Up @@ -177,7 +198,10 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
if ctx.isIncremental {
// On incremental scrape, we do not have all the data in the resource ID map.
// we use it from the cached resource id map.
ctx.resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate(string(ctx.ScrapeConfig().GetUID()), ctx.resourceIDMap.data)
ctx.resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate(
string(ctx.ScrapeConfig().GetUID()),
ctx.resourceIDMap.data,
)
} else {
resourceIDMapPerCluster.Swap(string(ctx.ScrapeConfig().GetUID()), ctx.resourceIDMap.data)
}
Expand Down Expand Up @@ -286,9 +310,12 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
if address.TargetRef != nil {
if address.TargetRef.Kind != "Service" {
relationships = append(relationships, v1.RelationshipResult{
ConfigExternalID: v1.ExternalID{ExternalID: alias("Service", obj.GetNamespace(), obj.GetName()), ConfigType: ConfigTypePrefix + "Service"},
RelatedConfigID: string(address.TargetRef.UID),
Relationship: fmt.Sprintf("Service%s", address.TargetRef.Kind),
ConfigExternalID: v1.ExternalID{
ExternalID: alias("Service", obj.GetNamespace(), obj.GetName()),
ConfigType: ConfigTypePrefix + "Service",
},
RelatedConfigID: string(address.TargetRef.UID),
Relationship: fmt.Sprintf("Service%s", address.TargetRef.Kind),
})
}
}
Expand Down Expand Up @@ -319,7 +346,10 @@ func ExtractResults(ctx *KubernetesContext, objs []*unstructured.Unstructured) v
Relationship: "Namespace" + obj.GetKind(),
}.WithConfig(
ctx.GetID("", "Namespace", obj.GetNamespace()),
v1.ExternalID{ExternalID: alias("Namespace", "", obj.GetNamespace()), ConfigType: ConfigTypePrefix + "Namespace"},
v1.ExternalID{
ExternalID: alias("Namespace", "", obj.GetNamespace()),
ConfigType: ConfigTypePrefix + "Namespace",
},
))
}

Expand Down

0 comments on commit dbdb6ee

Please sign in to comment.