Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cache resource id map #746

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (t *TempCache) Find(ctx ScrapeContext, lookup v1.ExternalID) (*models.Confi
}

var result models.ConfigItem
query := ctx.DB().Limit(1).Where("deleted_at IS NULL").Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID})
query := ctx.DB().Limit(1).Order("updated_at DESC").Where("deleted_at IS NULL").Where("type = ? and external_id @> ?", lookup.ConfigType, pq.StringArray{externalID})
if scraperID != "all" && scraperID != "" {
query = query.Where("scraper_id = ?", scraperID)
}
Expand Down
18 changes: 18 additions & 0 deletions api/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,24 @@ type RelationshipResult struct {
Relationship string
}

func (t RelationshipResult) WithConfig(id string, ext ExternalID) RelationshipResult {
if id != "" {
t.ConfigID = id
} else {
t.ConfigExternalID = ext
}
return t
}

func (t RelationshipResult) WithRelated(id string, ext ExternalID) RelationshipResult {
if id != "" {
t.RelatedConfigID = id
} else {
t.RelatedExternalID = ext
}
return t
}

func (r RelationshipResult) String() string {
s := ""
if r.ConfigID != "" {
Expand Down
86 changes: 41 additions & 45 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (

const ConfigTypePrefix = "Kubernetes::"

var resourceIDMapPerCluster PerClusterResourceIDMap

// ReservedAnnotations
const (
// AnnotationIgnoreConfig excludes the object from being scraped
Expand Down Expand Up @@ -226,7 +228,7 @@ func getObjectChangeExclusionAnnotations(ctx api.ScrapeContext, id string, exclu

// ExtractResults extracts scrape results from the given list of kuberenetes objects.
// - withCluster: if true, will create & add a scrape result for the kubernetes cluster.
func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured, withCluster bool) v1.ScrapeResults {
func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstructured.Unstructured, isIncrementalScrape bool) v1.ScrapeResults {
var (
results v1.ScrapeResults
changeResults v1.ScrapeResults
Expand All @@ -253,10 +255,17 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
Tags: v1.Tags{{Name: "cluster", Value: config.ClusterName}},
}

resourceIDMap := getResourceIDsFromObjs(objs)
resourceIDMap[""]["Cluster"] = make(map[string]string)
resourceIDMap[""]["Cluster"][config.ClusterName] = clusterID
resourceIDMap[""]["Cluster"]["selfRef"] = clusterID // For shorthand
resourceIDMap := NewResourceIDMap(objs)
resourceIDMap.Set("", "Cluster", config.ClusterName, clusterID)
resourceIDMap.Set("", "Cluster", "selfRef", clusterID) // For shorthand

if isIncrementalScrape {
// On incremental scrape, we do not have all the data in the resource ID map.
// we use it from the cached resource id map.
resourceIDMap.data = resourceIDMapPerCluster.MergeAndUpdate(string(ctx.ScrapeConfig().GetUID()), resourceIDMap.data)
} else {
resourceIDMapPerCluster.Swap(string(ctx.ScrapeConfig().GetUID()), resourceIDMap.data)
}

objChangeExclusionByType, objChangeExclusionBySeverity := formObjChangeExclusionMap(objs)

Expand Down Expand Up @@ -392,14 +401,16 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
var nodeName string
if spec["nodeName"] != nil {
nodeName = spec["nodeName"].(string)
nodeID := resourceIDMap[""]["Node"][nodeName]
nodeID := resourceIDMap.Get("", "Node", nodeName)
nodeExternalID := lo.CoalesceOrEmpty(nodeID, getKubernetesAlias("Node", "", nodeName))

relationships = append(relationships, v1.RelationshipResult{
ConfigExternalID: v1.ExternalID{ExternalID: []string{nodeExternalID}, ConfigType: ConfigTypePrefix + "Node"},
RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: ConfigTypePrefix + "Pod"},
Relationship: "NodePod",
})
RelatedConfigID: string(obj.GetUID()),
Relationship: "NodePod",
}.WithConfig(
resourceIDMap.Get("", "Node", nodeName),
v1.ExternalID{ExternalID: []string{nodeExternalID}, ConfigType: ConfigTypePrefix + "Node"},
))
}

if obj.GetLabels()["app.kubernetes.io/name"] == "aws-node" {
Expand Down Expand Up @@ -437,17 +448,19 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc

if obj.GetNamespace() != "" {
relationships = append(relationships, v1.RelationshipResult{
ConfigExternalID: v1.ExternalID{ExternalID: []string{fmt.Sprintf("Kubernetes/Namespace//%s", obj.GetNamespace())}, ConfigType: ConfigTypePrefix + "Namespace"},
RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind()},
Relationship: "Namespace" + obj.GetKind(),
})
RelatedConfigID: string(obj.GetUID()),
Relationship: "Namespace" + obj.GetKind(),
}.WithConfig(
resourceIDMap.Get("", "Namespace", obj.GetNamespace()),
v1.ExternalID{ExternalID: []string{getKubernetesAlias("Namespace", "", obj.GetNamespace())}, ConfigType: ConfigTypePrefix + "Namespace"},
))
}

for _, ownerRef := range obj.GetOwnerReferences() {
relationships = append(relationships, v1.RelationshipResult{
ConfigExternalID: v1.ExternalID{ExternalID: []string{string(ownerRef.UID)}, ConfigType: getConfigTypePrefix(ownerRef.APIVersion) + ownerRef.Kind},
RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind()},
Relationship: ownerRef.Kind + obj.GetKind(),
ConfigID: string(ownerRef.UID),
RelatedConfigID: string(obj.GetUID()),
Relationship: ownerRef.Kind + obj.GetKind(),
})
}

Expand All @@ -471,9 +484,12 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc

for _, id := range linkedConfigItemIDs {
rel := v1.RelationshipResult{
RelatedExternalID: v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: getConfigTypePrefix(obj.GetKind()) + obj.GetKind()},
ConfigID: id.String(),
}
ConfigID: id.String(),
}.WithRelated(
resourceIDMap.Get(obj.GetNamespace(), obj.GetKind(), obj.GetName()),
v1.ExternalID{ExternalID: []string{string(obj.GetUID())}, ConfigType: getConfigTypePrefix(obj.GetAPIVersion()) + obj.GetKind()},
)

relationships = append(relationships, rel)
}
}
Expand Down Expand Up @@ -592,7 +608,7 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc
}

results = append(results, changeResults...)
if withCluster {
if isIncrementalScrape {
results = append([]v1.ScrapeResult{cluster}, results...)
}

Expand All @@ -613,15 +629,15 @@ func ExtractResults(ctx api.ScrapeContext, config v1.Kubernetes, objs []*unstruc

// getKubernetesParent returns a list of potential parents in order.
// Example: For a Pod the parents would be [Replicaset, Namespace, Cluster]
func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.KubernetesExclusionConfig, resourceIDMap map[string]map[string]map[string]string) []v1.ConfigExternalKey {
func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.KubernetesExclusionConfig, resourceIDMap *ResourceIDMapContainer) []v1.ConfigExternalKey {
var allParents []v1.ConfigExternalKey
allParents = append(allParents, v1.ConfigExternalKey{
Type: ConfigTypePrefix + "Cluster",
ExternalID: resourceIDMap[""]["Cluster"]["selfRef"],
ExternalID: resourceIDMap.Get("", "Cluster", "selfRef"),
})

if obj.GetNamespace() != "" {
parentExternalID := resourceIDMap[""]["Namespace"][obj.GetNamespace()]
parentExternalID := resourceIDMap.Get("", "Namespace", obj.GetNamespace())
if parentExternalID == "" {
// An incremental scraper may not have the Namespace object.
// We can instead use the alias as the external id.
Expand All @@ -641,7 +657,7 @@ func getKubernetesParent(obj *unstructured.Unstructured, exclusions v1.Kubernete
// be its Deployment
if obj.GetKind() == "Pod" && lo.Contains(exclusions.Kinds, "ReplicaSet") && ref.Kind == "ReplicaSet" {
deployName := extractDeployNameFromReplicaSet(ref.Name)
parentExternalID := resourceIDMap[obj.GetNamespace()]["Deployment"][deployName]
parentExternalID := resourceIDMap.Get(obj.GetNamespace(), "Deployment", deployName)
allParents = append([]v1.ConfigExternalKey{{
Type: ConfigTypePrefix + "Deployment",
ExternalID: parentExternalID,
Expand Down Expand Up @@ -720,26 +736,6 @@ func extractDeployNameFromReplicaSet(rs string) string {
return strings.Join(split, "-")
}

func getResourceIDsFromObjs(objs []*unstructured.Unstructured) map[string]map[string]map[string]string {
// {Namespace: {Kind: {Name: ID}}}
resourceIDMap := make(map[string]map[string]map[string]string)
resourceIDMap[""] = make(map[string]map[string]string)

for _, obj := range objs {
if collections.Contains([]string{"Namespace", "Deployment", "Node"}, obj.GetKind()) {
if resourceIDMap[obj.GetNamespace()] == nil {
resourceIDMap[obj.GetNamespace()] = make(map[string]map[string]string)
}
if resourceIDMap[obj.GetNamespace()][obj.GetKind()] == nil {
resourceIDMap[obj.GetNamespace()][obj.GetKind()] = make(map[string]string)
}
resourceIDMap[obj.GetNamespace()][obj.GetKind()][obj.GetName()] = string(obj.GetUID())
}
}

return resourceIDMap
}

//nolint:errcheck
func cleanKubernetesObject(obj map[string]any) (map[string]any, error) {
o := gabs.Wrap(obj)
Expand Down
134 changes: 134 additions & 0 deletions scrapers/kubernetes/resource_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kubernetes

import (
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// map<namespace><kind><name>: <id>
type ResourceIDMap map[string]map[string]map[string]string

type ResourceIDMapContainer struct {
mu sync.RWMutex
data ResourceIDMap
}

func (t *ResourceIDMapContainer) Set(namespace, kind, name, id string) {
t.mu.Lock()
defer t.mu.Unlock()

if t.data[namespace] == nil {
t.data[namespace] = make(map[string]map[string]string)
}
if t.data[namespace][kind] == nil {
t.data[namespace][kind] = make(map[string]string)
}
t.data[namespace][kind][name] = id
}

func (t *ResourceIDMapContainer) Get(namespace, kind, name string) string {
t.mu.RLock()
defer t.mu.RUnlock()

if kinds, ok := t.data[namespace]; ok {
if names, ok := kinds[kind]; ok {
if id, ok := names[name]; ok {
return id
}
}
}

return ""
}

type PerClusterResourceIDMap struct {
mu sync.Mutex
data map[string]ResourceIDMap
}

func (t *PerClusterResourceIDMap) Swap(clusterID string, resourceIDMap ResourceIDMap) {
t.mu.Lock()
defer t.mu.Unlock()

if t.data == nil {
t.data = make(map[string]ResourceIDMap)
}

t.data[clusterID] = resourceIDMap
}

func (t *PerClusterResourceIDMap) MergeAndUpdate(clusterID string, resourceIDMap ResourceIDMap) ResourceIDMap {
t.mu.Lock()
defer t.mu.Unlock()

cached, ok := t.data[clusterID]
if ok {
resourceIDMap = mergeResourceIDMap(resourceIDMap, cached)
}

if t.data == nil {
t.data = make(map[string]ResourceIDMap)
}

t.data[clusterID] = resourceIDMap
return resourceIDMap
}

func NewResourceIDMap(objs []*unstructured.Unstructured) *ResourceIDMapContainer {
resourceIDMap := make(map[string]map[string]map[string]string)
for _, obj := range objs {
if resourceIDMap[obj.GetNamespace()] == nil {
resourceIDMap[obj.GetNamespace()] = make(map[string]map[string]string)
}
if resourceIDMap[obj.GetNamespace()][obj.GetKind()] == nil {
resourceIDMap[obj.GetNamespace()][obj.GetKind()] = make(map[string]string)
}
resourceIDMap[obj.GetNamespace()][obj.GetKind()][obj.GetName()] = string(obj.GetUID())
}

return &ResourceIDMapContainer{
data: resourceIDMap,
mu: sync.RWMutex{},
}
}

func mergeResourceIDMap(latest, cached ResourceIDMap) ResourceIDMap {
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
if len(latest) == 0 {
return cached
}

if len(cached) == 0 {
return latest
}

output := make(ResourceIDMap)

// First, copy all data from cached
for k, v := range cached {
output[k] = make(map[string]map[string]string)
for k2, v2 := range v {
output[k][k2] = make(map[string]string)
for k3, v3 := range v2 {
output[k][k2][k3] = v3
}
}
}

// Then, update or add data from latest
for k, v := range latest {
if _, ok := output[k]; !ok {
output[k] = make(map[string]map[string]string)
}
for k2, v2 := range v {
if _, ok := output[k][k2]; !ok {
output[k][k2] = make(map[string]string)
}
for k3, v3 := range v2 {
output[k][k2][k3] = v3
}
}
}

return output
}
Loading